Source code for pylablib.core.utils.rpyc_utils

"""Routines and classes related to RPyC package"""

from . import module as module_utils, net, library_parameters
library_parameters.library_parameters.update({"remote/rpyc/default_connect_config":{}},overwrite=False)

try:
    import rpyc
except ImportError as err:
    msg=(   "operation requires Python RPyC library. You can install it via PyPi as 'pip install rpyc'. "
            "If it is installed, check if it imports correctly by running 'import rpyc'")
    raise ImportError(msg) from err
import numpy as np

import encodings.idna  # pylint: disable=unused-import
import importlib
import struct
import pickle
import warnings
import socket


_default_packers={"numpy":np.ndarray.tostring,"pickle":pickle.dumps}
_default_unpackers={"pickle":pickle.loads}
def _is_tunnel_service(serv):
    return hasattr(serv,"tunnel_socket")
def _obtain_single(proxy, serv):
    if _is_tunnel_service(serv):
        loc_serv=serv.peer
        async_send=rpyc.async_(serv.tunnel_send)
        async_send(proxy,packer="pickle")
        data=pickle.loads(loc_serv.tunnel_recv())
        return data
    else:
        return rpyc.classic.obtain(proxy)


_numpy_block_size=int(2**20)
[docs] def obtain(proxy, serv=None, deep=False, direct=False): """ Obtain a remote netref object by value (i.e., copy it to the local Python instance). Wrapper around :func:`rpyc.utils.classic.obtain` with some special cases handling. `serv` specifies the current remote service. If it is of type :class:`SocketTunnelService`, use its socket tunnel for faster transfer. If ``deep==True`` and ``proxy`` is a container (tuple, list, or dict), run the function recursively for all its sub-elements. If ``direct==True``, directly use RPyC ``obtain`` method; otherwise use the custom method, which works better with large numpy arrays, but worse with composite types (e.g., lists). """ t=type(proxy) # each isinstance call is performed on the server, so getting type once is faster if deep and not direct and issubclass(t,tuple): # tuples are not passed as netrefs, so they need to be checked first return tuple([obtain(v,serv=serv) for v in proxy]) if not issubclass(t,rpyc.BaseNetref): return proxy if direct: return rpyc.classic.obtain(proxy) if deep: if isinstance(proxy,list): return [obtain(v,serv=serv) for v in proxy] if isinstance(proxy,dict): return {obtain(k,serv=serv):obtain(v,serv=serv) for k,v in proxy.items()} if isinstance(proxy,np.ndarray) or (t.__name__=="numpy.ndarray" and all([hasattr(proxy,a) for a in ["shape","dtype","tostring","flatten"]])): elsize=np.prod(proxy.shape,dtype="u8") bytesize=proxy.dtype.itemsize*elsize if bytesize>_numpy_block_size: if _is_tunnel_service(serv): loc_serv=serv.peer async_send=rpyc.async_(serv.tunnel_send) async_send(proxy,packer="numpy") data=loc_serv.tunnel_recv() return np.frombuffer(data,dtype=proxy.dtype.str).reshape(proxy.shape) else: fproxy=proxy.flatten() loc=np.zeros(elsize,dtype=proxy.dtype.str) block_size=_numpy_block_size//proxy.dtype.itemsize for pos in range(0,elsize,block_size): loc[pos:pos+block_size]=rpyc.classic.obtain(fproxy[pos:pos+block_size]) return loc.reshape(proxy.shape) return rpyc.classic.obtain(proxy)
[docs] def transfer(obj, serv): """ Send a local object to the remote PC by value (i.e., copy it to the remote Python instance). A 'reversed' version of :func:`obtain`. """ return serv.transfer(obj)
def _get_conn_address(conn, peer=False): """Get connection IP address""" s=socket.fromfd(conn.fileno(),socket.AF_INET,socket.SOCK_STREAM) addr=s.getpeername()[0] if peer else s.getsockname()[0] s.close() return addr
[docs] class SocketTunnelService(rpyc.SlaveService): """ Extension of the standard :class:`rpyc.core.service.SlaveService` with built-in network socket tunnel for faster data transfer. In order for the tunnel to work, services on both ends need to be subclasses of :class:`SocketTunnelService`. Because of the initial setup protocol, the two services are asymmetric: one should be 'server' (corresponding to the listening server), and one should be 'client' (external connection). The roles are decided by the `server` constructor parameter. """ _tunnel_block_size=int(2**20) def __init__(self, server=False): rpyc.SlaveService.__init__(self) self.server=server _default_tunnel_timeout=10. def _recv_socket(self, addr): """Set up a listener to receive a socket connection from the other service""" def listen(s): s.set_timeout(self._default_tunnel_timeout) self.tunnel_socket=s remote_call=rpyc.async_(self._conn.root._send_socket) def port_func(port): remote_call(addr,port) net.listen(addr,0,listen,port_func=port_func,timeout=self._default_tunnel_timeout,connections_number=1,socket_kwargs={"nodelay":True}) def _send_socket(self, dst_addr, dst_port): """Set up a client socket to connect to the other service""" self.tunnel_socket=net.ClientSocket(timeout=self._default_tunnel_timeout,nodelay=True) self.tunnel_socket.connect(dst_addr,dst_port)
[docs] def tunnel_send(self, obj, packer=None): """ Send data through the socket tunnel. If `packer` is not ``None``, it defines a function to convert `obj` to a bytes string. """ packer=_default_packers.get(packer,packer) if packer: obj=packer(obj) nchunks=(len(obj)-1)//self._tunnel_block_size+1 self.tunnel_socket.send_fixedlen(struct.pack(">I",nchunks)) for pos in range(0,len(obj),self._tunnel_block_size): self.tunnel_socket.send_decllen(obj[pos:pos+self._tunnel_block_size])
[docs] def tunnel_recv(self, unpacker=None): """ Receive data sent through the socket tunnel. If `unpacker` is not ``None``, it defines a function to convert the received bytes string into an object. """ nchunks,=struct.unpack(">I",self.tunnel_socket.recv_fixedlen(4)) chunks=[] for _ in range(nchunks): chunks.append(self.tunnel_socket.recv_decllen()) obj=b"".join(chunks) unpacker=_default_unpackers.get(unpacker,unpacker) return unpacker(obj) if unpacker else obj
[docs] def obtain(self, proxy): """Execute :func:`obtain` on the local instance""" return obtain(proxy,self)
[docs] def transfer(self, obj): """Execute :func:`transfer` on the local instance""" return self.peer.obtain(obj)
[docs] def on_connect(self, conn): rpyc.SlaveService.on_connect(self,conn) self.peer=conn.root if not self.server: self._recv_socket(_get_conn_address(conn))
[docs] def on_disconnect(self, conn): try: self.tunnel_socket.close() except AttributeError: pass rpyc.SlaveService.on_disconnect(self,conn)
[docs] class DeviceService(SocketTunnelService): """ Device RPyC service. Expands on :class:`SocketTunnelService` by adding :meth:`get_device` method, which opens local devices, tracks them, and closes them automatically on disconnect. """ def __init__(self, verbose=False): SocketTunnelService.__init__(self,server=True) self.verbose=verbose self.devices=[]
[docs] def on_connect(self, conn): SocketTunnelService.on_connect(self,conn) self.devices=[] if self.verbose: conn_addr=_get_conn_address(self._conn,peer=True) conn_host=net.get_remote_hostname(conn_addr) or "unknown" print("Connected client {} from {}, IP {}".format(self._conn,conn_host,conn_addr))
[docs] def on_disconnect(self, conn): if self.verbose and self.devices: print("Closing devices {} from client {}".format(self.devices,self._conn)) for dev in self.devices: try: dev.close() except: # pylint: disable=bare-except pass self.devices=[] if self.verbose: print("Disconnected client {}".format(self._conn)) SocketTunnelService.on_disconnect(self,conn)
[docs] def get_device_class(self, cls): """ Get remote device class. `cls` is the full class name, including the module within ``pylablib.devices`` (e.g., ``Attocube.ANC300``). """ if self.verbose: print("Requesting device class {} from client {}".format(cls,self._conn)) module,cls=cls.rsplit(".",maxsplit=1) try: module=importlib.import_module(module) except ImportError: module=importlib.import_module(module_utils.get_library_name()+".devices."+module) module._rpyc=True return getattr(module,cls)
[docs] def get_device(self, cls, *args, **kwargs): """ Connect to a device. `cls` is the full class name, including the module within ``pylablib.devices`` (e.g., ``Attocube.ANC300``). Stores reference to the connected device and closes it automatically on disconnect. """ cls=self.get_device_class(cls) dev=cls(*args,**kwargs) self.devices.append(dev) return dev
[docs] def run_device_service(port=18812, verbose=False): """Start :class:`DeviceService` at the given port""" if verbose: hostips=net.get_all_local_addr() hostnames=net.get_local_hostname(full=False),net.get_local_hostname(full=True) hostips_list=", ".join(["{}:{}".format(ip,port) for ip in hostips]) print("Running device service at {} ({}), IP {}".format(hostnames[0],hostnames[1],hostips_list)) rpyc.ThreadedServer(rpyc.utils.helpers.classpartial(DeviceService,verbose=verbose),port=port).start()
[docs] def connect_device_service(addr, port=18812, timeout=3, attempts=2, error_on_fail=True, config=None): """ Connect to the :class:`DeviceService` running at the given address and port `timeout` and `attempts` define respectively timeout of a single connection attempt, and the number of attempts (RPyC default is 3 seconds timeout and 6 attempts). If ``error_on_fail==True``, raise error if the connection failed; otherwise, return ``None`` """ addr,port=net.as_addr_port(addr,port) with warnings.catch_warnings(): warnings.simplefilter("ignore") try: s=rpyc.SocketStream.connect(addr,port,timeout=timeout,attempts=attempts) config=library_parameters.library_parameters["remote/rpyc/default_connect_config"] if config is None else config return rpyc.connect_stream(s,SocketTunnelService,config=config).root except net.socket.timeout: if error_on_fail: raise return None