from . import threadprop
from .synchronizing import QThreadNotifier, QMultiThreadNotifier
from ..utils import funcargparse, functions as func_utils
import threading
import time
import collections
### Remote call results ###
[docs]
class QCallResultSynchronizer(QThreadNotifier):
[docs]
def get_progress(self):
"""
Get the progress of the call execution.
Can be ``"waiting"`` (call is not done executing), ``"done"`` (call done successfully),
``"fail"`` (call failed, probably due to thread being stopped), ``"skip"`` (call was skipped),
or ``"exception"`` (call raised an exception).
"""
value=self.value
if value is None:
return "waiting"
tag=value[0]
if tag=="result":
return "done"
return tag
[docs]
def is_call_done(self):
"""Check if the call is done"""
return self.get_progress()=="done"
[docs]
def skipped(self):
"""Check if the call was skipped"""
return self.get_progress()=="skip"
[docs]
def failed(self):
"""Check if the call failed"""
return self.get_progress()=="fail"
[docs]
def get_value_sync(self, timeout=None, default=None, error_on_fail=True, error_on_skip=True, pass_exception=True): # pylint: disable=arguments-differ
"""
Wait (with the given `timeout`) for the value passed by the notifier
If ``error_on_fail==True`` and the controlled thread notifies of a fail (usually, if it's stopped before it executed the call),
raise :exc:`.threadprop.NoControllerThreadError`; otherwise, return `default`.
If ``error_on_skip==True`` and the call was skipped (e.g., due to full call queue), raise :exc:`.threadprop.SkippedCallError`; otherwise, return `default`.
If ``pass_exception==True`` and the returned value represents exception, re-raise it in the caller thread; otherwise, return `default`.
"""
res=super().get_value_sync(timeout=timeout)
if res is not None:
kind,value=res # pylint: disable=unpacking-non-sequence
if kind=="result":
return value
elif kind=="exception":
if pass_exception:
raise value
else:
return default
elif kind=="skip":
if error_on_skip:
raise threadprop.SkippedCallError()
return default
elif kind=="fail":
if error_on_fail:
raise threadprop.NoControllerThreadError("failed executing remote call: controller is stopped")
return default
else:
raise ValueError("unrecognized return value kind: {}".format(kind))
else:
if error_on_fail:
raise threadprop.TimeoutThreadError
return default
[docs]
class QDummyResultSynchronizer:
"""Dummy result synchronizer for call which don't require result synchronization (e.g., multicasts)"""
_not_synchronizer=True
[docs]
def notify(self, value):
pass
dummy_synchronizer=QDummyResultSynchronizer()
[docs]
class QDirectResultSynchronizer:
"""
Result "synchronizer" for direct calls.
Behaves as a regular result synchronizer with an already executed call.
"""
def __init__(self, value):
self.value=value
[docs]
def get_progress(self):
"""Get the progress of the call execution (always return ``"done"``)"""
return "done"
[docs]
def is_call_done(self):
"""Check if the call is done (always return ``True``)"""
return True
[docs]
def skipped(self):
"""Check if the call was skipped (always return ``False``)"""
return False
[docs]
def failed(self):
"""Check if the call failed (always return ``False``)"""
return False
[docs]
def get_value(self):
"""Return stored value"""
return self.value
[docs]
def get_value_sync(self, timeout=None, default=None, error_on_fail=True, error_on_skip=True, pass_exception=True): # pylint: disable=unused-argument
"""
Return stored value.
Parameters are only for compatibility with :class:`QCallResultSynchronizer`.
"""
return self.value
[docs]
def wait(self, *args, **kwargs):
"""Do nothing (present only for compatibility with :class:`QCallResultSynchronizer`)"""
[docs]
def notify(self, *args, **kwargs):
"""Do nothing (present only for compatibility with :class:`QCallResultSynchronizer`)"""
[docs]
def waiting(self):
"""Check if waiting is in progress (always return ``False``)"""
return False
[docs]
def done_wait(self):
"""Check if waiting is done (always return ``True``)"""
return True
[docs]
def success_wait(self):
"""Check if waiting is done successfully (always return ``True``)"""
return True
[docs]
def done_notify(self):
"""Check if notifying is done (always return ``True``)"""
return True
[docs]
def waiting_state(self):
return "done"
[docs]
def notifying_state(self):
return "done"
### Remote call ###
[docs]
class QScheduledCall:
"""
Object representing a scheduled remote call.
Can be executed, skipped, or failed in the target thread, in which case it notifies the result synchronizer (if supplied).
Args:
func: callable to be invoked in the destination thread
args: arguments to be passed to `func`
kwargs: keyword arguments to be passed to `func`
silent: if ``True``, silence the exception in the execution thread and simply pass it to the caller thread;
otherwise, the exception is raised in both threads
result_synchronizer: result synchronizer object; can be ``None`` (create new :class:`QCallResultSynchronizer`),
``"async"`` (no result synchronization), or a :class:`QCallResultSynchronizer` object.
"""
Callback=collections.namedtuple("Callback",["func","pass_result","call_on_exception","call_on_unschedule"])
def __init__(self, func, args=None, kwargs=None, silent=False, result_synchronizer=None):
self.func=func
self.args=args or []
self.kwargs=kwargs or {}
self.silent=silent
if result_synchronizer=="async":
result_synchronizer=dummy_synchronizer
elif result_synchronizer is None:
result_synchronizer=QCallResultSynchronizer()
self.result_synchronizer=result_synchronizer
self.callbacks=[]
self._notified=[0] # hack to avoid use of locks ([0] is False, [] is True, use .pop() to atomically check and change)
self.state="wait"
def _check_notified(self):
try:
self._notified.pop()
return False
except IndexError:
return True
[docs]
def execute(self, silent=None):
"""Execute the call and notify the result synchronizer (invoked by the destination thread)"""
if self._check_notified():
return
res=("fail",None)
try:
res=("result",self.func(*self.args,**self.kwargs))
except Exception as e: # pylint: disable=broad-except
res=("exception",e)
if not (self.silent if silent is None else silent):
raise
finally:
self.state=res[0]
for c in self.callbacks:
if c.call_on_exception or c.call_on_unschedule or res[0]=="result":
if c.pass_result:
c.func(res[1] if res[0]=="result" else None)
else:
c.func()
self.result_synchronizer.notify(res)
[docs]
def add_callback(self, callback, pass_result=True, call_on_exception=False, call_on_unschedule=False, front=False):
"""
Set the callback to be executed after the main call is done.
If ``pass_result==True``, pass function result to the callback (or ``None`` if call failed); otherwise, pass no arguments.
If ``call_on_exception==True``, call it even if the original call raised an exception.
If ``call_on_unschedule==True``, call it for any call unscheduling event, including using :meth:`skip` or :meth:`fail` methods
(this effectively ignores `call_on_exception`, since the callback is called regardless of the exception).
If ``front==True``, add the callback in the front of the line (executes first).
"""
cb=self.Callback(callback,pass_result,call_on_exception,call_on_unschedule)
if front:
self.callbacks.insert(0,cb)
else:
self.callbacks.append(cb)
[docs]
def fail(self):
"""Notify that the call is failed (invoked by the destination thread)"""
if self._check_notified():
return
self.state="fail"
for c in self.callbacks:
if c.call_on_unschedule:
c.func()
self.result_synchronizer.notify(("fail",None))
[docs]
def skip(self):
"""Notify that the call is skipped (invoked by the destination thread)"""
if self._check_notified():
return
self.state="skip"
for c in self.callbacks:
if c.call_on_unschedule:
c.func()
self.result_synchronizer.notify(("skip",None))
### Call schedulers ###
TDefaultCallInfo=collections.namedtuple("TDefaultCallInfo",["call_time"])
[docs]
class QScheduler:
"""
Generic call scheduler.
Two methods are used by the external scheduling routines: :meth:`build_call` to create a :class:`QScheduledCall` with appropriate parameters,
and :meth:`schedule`, which takes a call and schedules it.
The :meth:`schedule` method should return ``True`` if the scheduling was successful (at least, for now), and ``False`` otherwise.
Args:
call_info_argname: if not ``None``, supplies a name of a keyword argument
via which call info (generated by :meth:`build_call_info`) is passed on function call
"""
def __init__(self, call_info_argname=None):
self.call_info_argname=call_info_argname
[docs]
def build_call_info(self):
"""Build call info tuple which can be passed to scheduled calls"""
return TDefaultCallInfo(time.time())
[docs]
def build_call(self, func, args=None, kwargs=None, callback=None, pass_result=True, callback_on_exception=True, sync_result=True):
"""
Build :class:`QScheduledCall` for subsequent scheduling.
Args:
func: function to be called
args: arguments to be passed to `func`
kwargs: keyword arguments to be passed to `func`
callback: optional callback to be called when `func` is done
pass_result (bool): if ``True``, pass `func` result as a single argument to the callback; otherwise, give no arguments
callback_on_exception (bool): if ``True``, execute the callback on call fail or skip (if it requires an argument, ``None`` is supplied);
otherwise, only execute it if the call was successful
sync_result: if ``True``, the call has a default result synchronizer; otherwise, no synchronization is made.
"""
result_synchronizer=None if sync_result else "async"
scheduled_call=QScheduledCall(func,args,kwargs,result_synchronizer=result_synchronizer)
if self.call_info_argname:
scheduled_call.kwargs[self.call_info_argname]=self.build_call_info()
if callback is not None:
scheduled_call.add_callback(callback,pass_result=pass_result,call_on_exception=callback_on_exception)
return scheduled_call
[docs]
def schedule(self, call): # pylint: disable=unused-argument
"""Schedule the call"""
return False
[docs]
def clear(self):
"""Clear the scheduler"""
pass
[docs]
class QDirectCallScheduler(QScheduler):
"""
Simplest call scheduler: directly executes the calls on scheduling in the scheduling thread.
Args:
call_info_argname: if not ``None``, supplies a name of a keyword argument
via which call info (generated by :meth:`QScheduler.build_call_info`) is passed on function call
"""
[docs]
def build_call(self, func, args=None, kwargs=None, callback=None, pass_result=True, callback_on_exception=True, sync_result=False):
return super().build_call(func,args=args,kwargs=kwargs,
callback=callback,pass_result=pass_result,callback_on_exception=callback_on_exception,sync_result=sync_result)
[docs]
def schedule(self, call):
call.execute()
return True
[docs]
class QQueueScheduler(QScheduler):
"""
Call scheduler with a builtin call queue.
Supports placing the calls and retrieving them (from the destination thread).
Has ability to skip some calls if, e.g., the queue is too full. Whether the call should be skipped is determined
by :meth:`can_schedule` (should be overloaded in subclasses).
Used as a default command scheduler.
Args:
on_full_queue: action to be taken if the call can't be scheduled (i.e., :meth:`can_schedule` returns ``False``); can be
``"skip_current"`` (skip the call which is being scheduled),
``"skip_newest"`` (skip the most recent call; place the current)
``"skip_oldest"`` (skip the oldest call in the queue; place the current),
``"call_current"`` (execute the call which is being scheduled immediately in the caller thread),
``"call_newest"`` (execute the most recent call immediately in the caller thread),
``"call_oldest"`` (execute the oldest call in the queue immediately in the caller thread), or
``"wait"`` (wait until the call can be scheduled, which is checked after every call removal from the queue; place the call)
call_info_argname: if not ``None``, supplies a name of a keyword argument
via which call info (generated by :meth:`QScheduler.build_call_info`) is passed on function call
Methods to overload:
- :meth:`can_schedule`: check if the call can be scheduled
- :meth:`call_added`: called when a new call has been added to the queue
- :meth:`call_popped`: called when a call has been removed from the queue (either for execution, or for skipping)
"""
def __init__(self, on_full_queue="skip_current", call_info_argname=None):
super().__init__(call_info_argname=call_info_argname)
self.call_queue=collections.deque()
funcargparse.check_parameter_range(on_full_queue,"on_full_queue",{"skip_current","skip_newest","skip_oldest","call_current","call_newest","call_oldest","wait"})
self.on_full_queue=on_full_queue
self.lock=threading.Lock()
self.call_popped_notifier=QMultiThreadNotifier() if on_full_queue=="wait" else None
self.working=True
self._last_popped=[None]
[docs]
def can_schedule(self, call): # pylint: disable=unused-argument
"""Check if the call can be scheduled"""
return True
[docs]
def call_added(self, call):
"""Called whenever `call` has been added to the queue"""
[docs]
def call_popped(self, call, idx):
"""
Called whenever `call` has been removed from the queue
`idx` determines the call position within the queue.
"""
def _add_call(self, call):
self.call_queue.append(call)
self.call_added(call)
def _pop_call(self, head=False):
try:
call=self.call_queue.popleft() if head else self.call_queue.pop()
if self.call_popped_notifier is not None:
self.call_popped_notifier.notify()
self.call_popped(call,0 if head else -1)
self._last_popped=[self._last_popped[-1],call]
return call
except IndexError:
return None
[docs]
def schedule(self, call):
"""Schedule a call"""
if not self.working:
call.fail()
return
if self.on_full_queue=="wait":
while True:
with self.lock:
if self.can_schedule(call):
self._add_call(call)
return True
elif not self.working:
return
wait_n=self.call_popped_notifier.wait(-1)
self.call_popped_notifier.wait(wait_n)
scheduled=True
skipped_call=None
execute_call=None
with self.lock:
if self.can_schedule(call):
self._add_call(call)
elif self.on_full_queue=="skip_newest":
skipped_call=self._pop_call()
self._add_call(call)
elif self.on_full_queue=="skip_oldest":
skipped_call=self._pop_call(head=True)
self._add_call(call)
elif self.on_full_queue=="skip_current":
skipped_call=call
scheduled=False
elif self.on_full_queue=="call_newest":
execute_call=self._pop_call()
self._add_call(call)
elif self.on_full_queue=="call_oldest":
execute_call=self._pop_call(head=True)
self._add_call(call)
elif self.on_full_queue=="call_current":
execute_call=call
else:
scheduled=False
if skipped_call is not None:
skipped_call.skip()
if execute_call is not None:
execute_call.execute()
return scheduled
[docs]
def pop_call(self):
"""
Pop the call from the queue head.
If the queue is empty, return ``None``
"""
with self.lock:
return self._pop_call(head=True)
[docs]
def unschedule(self, call):
"""
Unschedule a given call.
Designed for joint queue operation, so the call is not notified (assume that it has been already notified elsewhere).
"""
if call in self._last_popped:
return False
with self.lock:
try:
idx=self.call_queue.index(call)
del self.call_queue[idx]
if self.call_popped_notifier is not None:
self.call_popped_notifier.notify()
self.call_popped(call,idx)
return True
except ValueError:
return False
[docs]
def has_calls(self):
"""Check if there are queued calls"""
return bool(self.call_queue)
def __len__(self):
return len(self.call_queue)
[docs]
def clear(self, close=True): # pylint: disable=arguments-differ
"""
Clear the call queue.
If ``close==True``, mark the queue as closed (any attempt to schedule more calls fails automatically) and fail all calls in the queue;
otherwise, skip all calls currently in the queue.
"""
if close:
self.working=False
with self.lock:
all_calls=[]
c=self._pop_call(head=True)
while c is not None:
all_calls.append(c)
c=self._pop_call()
for c in all_calls:
if close:
c.fail()
else:
c.skip()
if self.call_popped_notifier is not None:
self.call_popped_notifier.notify()
[docs]
class QQueueLengthLimitScheduler(QQueueScheduler):
"""
Queued call scheduler with a length limit.
Args:
max_len: maximal queue length; non-positive values are interpreted as no limit
can also be a tuple ``(arg_name, max_len)``, in which case the length is calculated separately
for every value of the parameter ``arg_name`` supplied to the method
on_full_queue: action to be taken if the call can't be scheduled (the queue is full); can be
``"skip_current"`` (skip the call which is being scheduled),
``"skip_newest"`` (skip the most recent call; place the current)
``"skip_oldest"`` (skip the oldest call in the queue; place the current),
``"call_current"`` (execute the call which is being scheduled immediately in the caller thread),
``"call_newest"`` (execute the most recent call immediately in the caller thread),
``"call_oldest"`` (execute the oldest call in the queue immediately in the caller thread), or
``"wait"`` (wait until the call can be scheduled, which is checked after every call removal from the queue; place the call)
call_info_argname: if not ``None``, supplies a name of a keyword argument
via which call info (generated by :meth:`QScheduler.build_call_info`) is passed on function call
"""
def __init__(self, max_len=1, on_full_queue="skip_current", call_info_argname=None):
super().__init__(on_full_queue=on_full_queue,call_info_argname=call_info_argname)
self.max_len_arg,self.max_len=max_len if isinstance(max_len,tuple) else (None,max_len)
self._arg_lens={}
self._arg_par=None
[docs]
def change_max_len(self, max_len):
"""Change maximal length of the call queue (doesn't affect already scheduled calls)"""
self.max_len_arg,self.max_len=max_len if isinstance(max_len,tuple) else (None,max_len)
[docs]
def get_current_len(self):
"""Get current number of calls in the queue"""
return len(self.call_queue)
def _get_arg_value(self, call, name):
if self._arg_par is None or self._arg_par[0] is not call.func:
sig=func_utils.funcsig(call.func)
self._arg_par=(call.func,sig)
try:
return self._arg_par[1].arg_value(name,args=call.args,kwargs=call.kwargs)
except TypeError:
return None
[docs]
def call_added(self, call):
if self.max_len_arg is not None:
arg_val=self._get_arg_value(call,self.max_len_arg)
self._arg_lens[arg_val]=self._arg_lens.get(arg_val,0)+1
[docs]
def call_popped(self, call, idx):
if self.max_len_arg is not None:
arg_val=self._get_arg_value(call,self.max_len_arg)
self._arg_lens[arg_val]-=1
[docs]
def can_schedule(self, call):
if self.max_len<=0:
return True
if self.max_len_arg is not None:
arg_val=self._get_arg_value(call,self.max_len_arg)
l=self._arg_lens.setdefault(arg_val,0)
else:
l=len(self.call_queue)
return l<self.max_len
[docs]
class QQueueSizeLimitScheduler(QQueueScheduler):
"""
Queued call scheduler with a generic size limit; similar to :class:`QQueueLengthLimitScheduler`,
but more flexible and can implement more restrictions (e.g., queue length and arguments RAM size).
Args:
max_size: maximal total size of the arguments; can be either a single number, or a tuple (if several different size metrics are involved);
non-positive values are interpreted as no limit
size_calc: function that takes a single argument (call to be placed) and returns its size; can be either a single number,
or a tuple (if several different size metrics are involved);
by default, simply returns 1, which makes the scheduler behavior identical to :class:`QQueueLengthLimitScheduler`
on_full_queue: action to be taken if the call can't be scheduled (the queue is full); can be
``"skip_current"`` (skip the call which is being scheduled),
``"skip_newest"`` (skip the most recent call; place the current)
``"skip_oldest"`` (skip the oldest call in the queue; place the current),
``"call_current"`` (execute the call which is being scheduled immediately in the caller thread),
``"call_newest"`` (execute the most recent call immediately in the caller thread),
``"call_oldest"`` (execute the oldest call in the queue immediately in the caller thread), or
``"wait"`` (wait until the call can be scheduled, which is checked after every call removal from the queue; place the call)
call_info_argname: if not ``None``, supplies a name of a keyword argument
via which call info (generated by :meth:`QScheduler.build_call_info`) is passed on function call
"""
def __init__(self, max_size=1, size_calc=None, on_full_queue="skip_current", call_info_argname=None):
super().__init__(on_full_queue=on_full_queue,call_info_argname=call_info_argname)
self.max_size=funcargparse.as_sequence(max_size)
self.size_queues=tuple([[] for _ in self.max_size])
self.size_calc=size_calc
[docs]
def change_max_size(self, max_size):
"""Change size restrictions"""
self.max_size=funcargparse.as_sequence(max_size)
[docs]
def get_current_size(self):
"""Get current size metrics"""
return tuple([sum(q) for q in self.size_queues])
def _get_size(self, call):
if self.size_calc is not None:
return self.size_calc(call)
return 1
[docs]
def call_added(self, call):
size=funcargparse.as_sequence(self._get_size(call))
for s,q in zip(size,self.size_queues):
q.append(s)
[docs]
def call_popped(self, call, idx):
for q in self.size_queues:
q.pop(idx)
[docs]
def can_schedule(self, call):
for ms,q in zip(self.max_size,self.size_queues):
if ms>0 and sum(q)>=ms:
return False
return True
[docs]
def schedule_multiple_queues(call, queues):
"""
Schedule the call simultaneously in several queues.
Go through queues in the given order and schedule call in every one of them.
If one of the schedules failed or the call has been executed there, unschedule it from all the previous queues
and return ``False``; otherwise, return ``True``.
"""
if len(queues)==0:
return True
if len(queues)==1:
return queues[0].schedule(call) and call.state=="wait"
def queue_unscheduler(queue):
return lambda: queue.unschedule(call)
added=[]
complete=False
try:
for q in queues:
call.add_callback(queue_unscheduler(q),pass_result=False,call_on_unschedule=True,front=True)
q.schedule(call)
if call.state!="wait": # skipped or executed
return False
added.append(q)
complete=True
return True
finally:
if not complete and added:
added[-1].unschedule(call) # the previous ones should be unscheduled via the callback
[docs]
class QMultiQueueScheduler:
"""
Wrapper around :func:`schedule_multiple_queues` which acts as a single scheduler.
Support additional notifiers, which are called if the scheduling is successful
(e.g., to notify and wake up the destination thread).
"""
def __init__(self, schedulers, notifiers):
self.schedulers=schedulers
self.notifiers=notifiers
[docs]
def build_call(self, *args, **kwargs):
return self.schedulers[0].build_call(*args,**kwargs)
[docs]
def schedule(self, call):
if schedule_multiple_queues(call,self.schedulers):
for n in self.notifiers:
n()
[docs]
class QThreadCallScheduler(QScheduler):
"""
Call scheduler via thread calls (:meth:`.QThreadController.call_in_thread_callback`)
Args:
thread: destination thread (by default, thread which creates the scheduler)
tag: if supplied, send the call in a message with the given tag; otherwise, use the interrupt call (generally, higher priority method).
priority: message priority (only when `tag` is not ``None``)
interrupt: whether the call is an interrupt (call inside any loop, e.g., during waiting or sleeping), or it should be called in the main event loop
call_info_argname: if not ``None``, supplies a name of a keyword argument
via which call info (generated by :meth:`QScheduler.build_call_info`) is passed on function call
"""
def __init__(self, thread=None, tag=None, priority=0, interrupt=True, call_info_argname=None):
super().__init__(call_info_argname=call_info_argname)
self.thread=thread or threadprop.current_controller()
self.tag=tag
self.priority=priority
self.interrupt=interrupt
[docs]
def schedule(self, call):
self.thread._place_call(call,tag=self.tag,priority=self.priority,interrupt=self.interrupt)
return True
[docs]
class QMulticastThreadCallScheduler(QThreadCallScheduler):
"""
Extended call scheduler via thread calls, which can limit number of queued calls.
Args:
thread: destination thread (by default, thread which creates the scheduler)
limit_queue: call queue limit (non-positive numbers are interpreted as no limit)
tag: if supplied, send the call in a message with the given tag; otherwise, use the interrupt call (generally, higher priority method).
priority: message priority (only when `tag` is not ``None``)
interrupt: whether the call is an interrupt (call inside any loop, e.g., during waiting or sleeping), or it should be called in the main event loop
call_info_argname: if not ``None``, supplies a name of a keyword argument
via which call info (generated by :meth:`QScheduler.build_call_info`) is passed on function call
"""
def __init__(self, thread=None, limit_queue=1, tag=None, priority=0, interrupt=True, call_info_argname=None):
super().__init__(thread,tag=tag,priority=priority,interrupt=interrupt,call_info_argname=call_info_argname)
self.limit_queue=limit_queue or 0
self.queue_cnt=0
self.queue_cnt_lock=threading.Lock()
def _call_done(self):
with self.queue_cnt_lock:
self.queue_cnt-=1
[docs]
def schedule(self, call):
if self.limit_queue<=0 or self.queue_cnt<self.limit_queue:
with self.queue_cnt_lock:
self.queue_cnt+=1
call.add_callback(self._call_done,pass_result=False,call_on_exception=True,front=True)
return super().schedule(call)
else:
call.skip()
return False