from .base import GenericVoltcraftError, GenericVoltcraftBackendError
from ...core.utils import py3
from ...core.devio import SCPI, comm_backend, interface
import re
import struct
import collections
import numpy as np
[docs]
class VC7055(SCPI.SCPIDevice):
"""
Voltcraft VC-7055BT bench-top multimeter.
Args:
addr: device connection (usually a COM-port name such as ``"COM1"``).
"""
Error=GenericVoltcraftError
ReraiseError=GenericVoltcraftBackendError
def __init__(self, addr):
backend_defaults={"serial":("COM1",115200,8,'N',1)}
super().__init__(addr,backend_defaults=backend_defaults)
with self._close_on_error():
self.get_id()
self._add_settings_variable("functions",lambda: self.get_function("all"), lambda v: self.set_function(v,channel="all"),multiarg=False)
self._add_settings_variable("range",self.get_range,self.set_range)
self._add_settings_variable("autorange",self.is_autorange_enabled,self.enable_autorange)
self._add_settings_variable("measurement_rate",self.get_measurement_rate,self.set_measurement_rate)
self._add_status_variable("readings",lambda: self.get_reading("all"))
def _read_echo(self, delay=0.):
self.sleep(delay)
self.instr.read(1)
self.sleep(1E-2)
self.instr.read()
_p_function=interface.EnumParameterClass("function",
{"volt_dc":"VOLT:DC","volt_ac":"VOLT:AC","curr_dc":"CURR:DC","curr_ac":"CURR:AC","cap":"CAP","res":"RES","fres":"FRES",
"freq":"FREQ","per":"PER","cont":"CONT","diode":"DIOD","temp":"TEMP","none":"NONE"})
def _normalize_function(self, func):
func=func.strip('"').upper().split()
if len(func)==1:
if func[0] in ["CURR","VOLT"]:
func=func+["DC"]
return ":".join(func)
_p_reading_channel=interface.EnumParameterClass("reading_channel",{"primary":1,"secondary":2})
_reading_channels=["primary","secondary"]
@interface.use_parameters(_returns="function",channel="reading_channel")
def _get_single_function(self, channel):
return self._normalize_function(self.ask("FUNC{}?".format(channel)))
[docs]
def get_function(self, channel="primary"):
"""Get measurement function for the given measurement channel (``"primary"`` or ``"secondary"``, or ``"all"`` for both channels)"""
if channel=="all":
return tuple(self._get_single_function(k) for k in self._reading_channels)
return self._get_single_function(channel)
@interface.use_parameters(function="function",channel="reading_channel")
def _set_single_function(self, function, channel):
if self._wap._get_single_function(channel)!=function:
if channel==1:
self.write("CONF:{}".format(function),read_echo=True)
else:
self.write("FUNC2",'"{}"'.format(function),read_echo=True)
return self._wip._get_single_function(channel=channel)
[docs]
def set_function(self, function, channel="primary", reset_secondary=True):
"""
Set measurement function for the given measurement channel (``"primary"``, ``"secondary"``, or ``"all"`` for both channels).
If ``reset_secondary==True`` and the primary function is changed, set the secondary function to ``"none"`` to avoid conflicts.
"""
if channel=="all" or isinstance(function,(tuple,list)):
self._set_single_function("none","secondary")
return tuple(self._set_single_function(f,k) for f,k in zip(function,self._reading_channels))
if reset_secondary and channel=="primary" and self.get_function()!=function:
self._set_single_function("none","secondary")
return self._set_single_function(function,channel)
_range_indices={"VOLT:DC":[50E-3,500E-3,5,50,500,1E3],"VOLT:AC":[500E-3,5,50,500,750],
"CURR:DC":[500E-6,5E-3,50E-3,500E-3,5,10],"CURR:AC":[500E-6,5E-3,50E-3,500E-3,5,10],
"RES":[500,5E3,50E3,500E3,5E6,50E6],"FRES":[500,5E3,50E3],"CAP":[50E-9,500E-9,5E-6,50E-6,500E-6,5E-3,50E-3]}
_si_pfx={"G":1E9,"M":1E6,"k":1E3,"K":1E3,"":1,"m":1E-3,"u":1E-6,"n":1E-9,"p":1E-12}
[docs]
def get_range(self):
"""Get the present measurement range"""
f=self._wop._get_single_function("primary")
if f not in self._range_indices:
return None
rng=self.ask("RANGE?","raw").strip()[:-1]
if rng[-2:]==b"\xa6\xcc": # micro-range glitch for capacitance
v,p=rng[:-2],"u"
else:
m=re.match(r"(\d+)\s*(G|M|k|K||m|u|n|p)",py3.as_str(rng))
if m is None:
raise GenericVoltcraftError("unrecognized range value: {}".format(rng))
v,p=m.groups()
return float(v)*self._si_pfx[p]
[docs]
def set_range(self, rng):
"""Set the present measurement range"""
f=self._wop._get_single_function("primary")
if f not in self._range_indices:
return None
rngvals=np.array(self._range_indices[f])
closest_idx=abs(np.log(rngvals)-np.log(rng)).argmin() if rng>0 else 0
self.write("RANGE",closest_idx+1,"int",read_echo=True,read_echo_delay=0.1)
return self.get_range()
[docs]
def is_autorange_enabled(self):
"""Check if autoscaling is enabled"""
return self.ask("AUTO?","bool")
[docs]
def enable_autorange(self, enable=True):
"""Enable or disable autoscaling"""
if enable:
self.write("AUTO",read_echo=True,read_echo_delay=0.1)
else:
self.set_range(self.get_range())
return self.is_autorange_enabled()
_p_measurement_rate=interface.EnumParameterClass("measurement_rate",{"fast":"F","med":"M","slow":"S"})
[docs]
@interface.use_parameters(_returns="measurement_rate")
def get_measurement_rate(self):
"""Get measurement update rate (``"fast""``, ``"med"``, or ``"slow"``)"""
return self.ask("RATE?")
[docs]
@interface.use_parameters(rate="measurement_rate")
def set_measurement_rate(self, rate):
"""Set measurement update rate (``"fast""``, ``"med"``, or ``"slow"``)"""
if self._wop.get_measurement_rate()!=rate:
self.write("RATE",rate,read_echo=True,read_echo_delay=0.2)
return self.get_measurement_rate()
@interface.use_parameters(channel="reading_channel")
def _get_single_reading(self, channel):
if channel==2 and self.get_function("secondary")=="none":
return None
return self.ask("MEAS{}?".format(channel),"float")
[docs]
def get_reading(self, channel="primary"):
"""Return the latest reading of the given measurement channel (``"primary"``, ``"secondary"``, or ``"all"`` for both channels)"""
if channel=="all":
return tuple(self._get_single_reading(k) for k in self._reading_channels)
return self._get_single_reading(channel)
[docs]
class VC880ParseError(GenericVoltcraftError):
"""Voltcraft VC880 message parse error"""
TVC880Reading=collections.namedtuple("TVC880Reading",["func","kind","value","unit","disps","d2func"])
[docs]
class VC880(comm_backend.ICommBackendWrapper):
"""
Voltcraft VC880/VC650BT series multimeter.
Args:
conn: device connection (usually, either a HID path, or an integer 0-based index indicating the devices among the ones connected)
"""
Error=GenericVoltcraftError
def __init__(self, conn=0):
if isinstance(conn,int):
hid_devices=comm_backend.list_backend_resources("hid",desc=True)
vc_devices=[dev for dev in hid_devices if (dev.vendor_id,dev.product_id)==(0x10C4,0xEA80)]
if conn>=len(vc_devices):
raise GenericVoltcraftError("could not find devices with index {}; {} devices available".format(conn,len(vc_devices)))
path=vc_devices[conn].path
else:
path=conn
instr=comm_backend.new_backend(path,"hid",defaults={"hid":("rep_fmt","lenpfx")},timeout=3.,reraise_error=GenericVoltcraftBackendError)
super().__init__(instr)
self._add_status_variable("reading",self.get_reading)
_header_magic=b"\xAB\xCD"
TMessage=collections.namedtuple("TMessage",["typ","payload"])
def _read_single_message(self):
hdr=self.instr.read(4)
for _ in range(100):
if hdr[:2]==self._header_magic:
break
hdr=hdr[1:]+self.instr.read(1)
if hdr[:2]!=self._header_magic:
raise VC880ParseError("could not find the header 0x{:02X}{:02X}".format(*self._header_magic))
l,typ=struct.unpack("BB",hdr[2:])
if l<3:
raise VC880ParseError("error in the received message length: expect at least 3 bytes, got {}".format(l))
msg_pending=self.instr.get_pending()>l-1
msg=self.instr.read(l-1)
payload=msg[:-2]
rcsum,=struct.unpack(">H",msg[-2:])
ccsum=sum(hdr)+sum(payload)
if ccsum!=rcsum:
raise VC880ParseError("error in the received message check sum: received at 0x{:04X}, calculated 0x{:04X}".format(rcsum,ccsum))
return self.TMessage(typ,payload),msg_pending
[docs]
def read_message(self, tries=10):
"""Read the oldest message in the queue"""
for t in range(tries):
try:
return self._read_single_message()[0]
except VC880ParseError:
if t==tries-1:
raise
[docs]
def exhaust_messages(self, nmax=100000, tries=10):
"""
Read all messages in the queue and return them
`nmax` specifies the maximal number of messages to read (``None`` means reading until available).
"""
received=[]
t=0
while nmax is None or len(received)<nmax:
try:
msg,pending=self._read_single_message()
received.append(msg)
if not pending:
break
t=0
except VC880ParseError:
t+=1
if t==tries:
break
return received
def _build_message(self, comm, data=b""):
hdr=self._header_magic+struct.pack("BB",len(data)+3,comm)
csum=struct.pack(">H",sum(hdr)+sum(data))
return hdr+data+csum
[docs]
def send_message(self, comm, data=b"", pre_exhaust=True, reps=1, post_read=0):
"""
Send a message containing the given command and data.
If ``pre_exhaust==True``, empty the read queue before sending the message (improves chances of delivery).
`reps` specifies the number of exhaust/send cycle repetitions (improves chances of delivery).
If `post_read` is more than 0, it specifies the number of messages to read after the command is sent.
"""
for _ in range(reps):
if pre_exhaust:
self.exhaust_messages()
self.instr.write(self._build_message(comm,data=data))
return [self.read_message() for _ in range(post_read)]
_functions={
0x00:("DCV","V","V","volt_dc"), # function, units, range_kind, function_kind
0x01:("ACDCV","V","V","volt_acdc"),
0x02:("DCmV","V","mV","volt_dc"),
0x03:("freq","Hz","Hz","freq"),
0x04:("duty_cycl","perc","perc","duty_cycl"),
0x05:("ACV","V","V","volt_ac"),
0x06:("res","Ohm","Ohm","res"),
0x07:("diod","V","V","diod"),
0x08:("short","Ohm","Ohm","short"),
0x09:("cap","F","F","cap"),
0x0A:("t_cels","dC","dC","temp"),
0x0B:("t_fahr","dF","dF","temp"),
0x0C:("DCuA","A","uA","curr_dc"),
0x0D:("ACuA","A","uA","curr_ac"),
0x0E:("DCmA","A","mA","curr_dc"),
0x0F:("ACmA","A","mA","curr_ac"),
0x10:("DCA","A","A","curr_dc"),
0x11:("ACA","A","A","curr_ac"),
0x12:("low_pass","V","V","low_pass")}
_ranges={ "V":[4,40,400,1000],
"mV":[4e-1],
"A":[10],
"mA":[4E-2,4E-1],
"uA":[4E-4,4E-3],
"Ohm":[4E2,4E3,4E4,4E5,4E6,4E7],
"F":[4E-8,4E-7,4E-6,4E-5,4E-4,4E-3,4E-2],
"Hz":[4E1,4E2,4E3,4E4,4E5,4E6,4E7,4E8]}
def _decode_live(self, data):
func,rng=data[:2]
if func not in self._functions:
raise GenericVoltcraftError("unrecognized function index: {:02X}".format(func))
func,unit,rngk,kind=self._functions[func]
rng=self._ranges.get(rngk,[1])[rng-0x30]
pfxord=int(np.log10(rng*.99)//3)
val=py3.as_str(data[2:9]).strip()
stat=list(data[26:33])
def parse_val(val):
if val=="-"*len(val):
return None
elif val.upper().find("OL")>=0:
return"over"
else:
return float(val)*10**(pfxord*3)
val=parse_val(val)
disps=[]
for ds,de,c in [(9,16,parse_val),(16,23,int),(23,26,int)]:
d=py3.as_str(data[ds:de]).strip()
try:
d=c(d) if d else None
except ValueError:
pass
disps.append(d)
d2func=None
if stat[1]&0x08:
d2func="max"
elif stat[1]&0x04:
d2func="min"
elif stat[1]&0x02:
d2func="avg"
elif stat[1]&0x01:
d2func="rel"
return TVC880Reading(func,kind,val,unit,tuple(disps),d2func)
_p_reading_kind=interface.EnumParameterClass("reading_kind",["latest","oldest","all"])
[docs]
@interface.use_parameters(kind="reading_kind")
def get_reading(self, kind="latest"):
"""
Get the multimeter reading.
`kind` can be ``"latest"`` (return the most recent reading), ``"oldest"`` (return the oldest reading),
or ``"all"`` (return all readings in the read queue).
Return tuple ``(func, kind, val, unit, disps, d2func)`` with, correspondingly, specific selected function (e.g., ``"DCuA"`` or ``"res"``),
function kind (e.g., ``"curr_dc"`` or ``"res"``), displayed value (in SI units), value units (e.g., ``"V"`` or ``"Ohm"``),
values of the other 3 auxiliary displays (upper right min/max/avg/rel display, upper left memory display, bottom linear scale display),
and the kind of function on the upper right display (``"min"``, ``"max"``, ``"avg"``, or ``"rel"``).
"""
if kind=="all":
return [self._decode_live(m.payload) for m in self.exhaust_messages() if m.type==0x01]
if kind=="latest":
msgs=self.exhaust_messages()
for m in msgs[::-1]:
if m.typ==0x01:
return self._decode_live(m.payload)
for _ in range(100):
m=self.read_message()
if m.typ==0x01:
return self._decode_live(m.payload)
raise GenericVoltcraftError("could not receive a live update message")
[docs]
def enable_autorange(self, enable=True):
"""Enable or disable autorange"""
if enable:
self.send_message(0x47,reps=3)
else:
self.send_message(0x46,reps=1)