77 lines
2.7 KiB
Python
77 lines
2.7 KiB
Python
from functools import reduce
|
|
from inspect import signature
|
|
from typing import Dict, Optional, List
|
|
|
|
from src.pluginbase import PluginBase
|
|
|
|
|
|
def flat_map(array: List[List]) -> List:
|
|
return reduce(list.__add__, array)
|
|
|
|
|
|
class ZonedPluginRegistry:
|
|
|
|
def __init__(self):
|
|
self.plugins: Dict[str, PluginBase] = {}
|
|
|
|
def register_plugin(self, name: str, plugin: PluginBase):
|
|
self.plugins[name] = plugin
|
|
|
|
def copy(self):
|
|
result = ZonedPluginRegistry()
|
|
for name in self.plugins.keys():
|
|
plugin = self.plugins[name].copy()
|
|
result.register_plugin(name, plugin)
|
|
|
|
return result
|
|
|
|
def execute_single(self, function_name: str, *args) -> Optional[any]:
|
|
return self._execute(function_name, True, *args)
|
|
|
|
def execute(self, function_name: str, *args) -> [any]:
|
|
return self._execute(function_name, False, *args)
|
|
|
|
def execute_flat_map(self, function_name: str, *args) -> [any]:
|
|
return flat_map(self.execute(function_name, *args))
|
|
|
|
def execute_remove_falsy(self, function_name: str, *args) -> [any]:
|
|
result = self.execute(function_name, *args)
|
|
return [r for r in result if r]
|
|
|
|
def execute_flat_map_remove_falsy(self, function_name: str, *args) -> [any]:
|
|
result = flat_map(self.execute(function_name, *args))
|
|
return [r for r in result if r]
|
|
|
|
def _execute(self, function_name: str, return_first: bool, *args) -> []:
|
|
result = []
|
|
for plugin in self.plugins.values():
|
|
fun = getattr(plugin, function_name, None)
|
|
|
|
if callable(fun):
|
|
sig = signature(fun)
|
|
if len(sig.parameters) != len(args):
|
|
raise RuntimeError("method %s.%s has wrong number of arguments. expected %s but was %s " % (
|
|
plugin, function_name, len(args), len(sig.parameters)))
|
|
# print("calling %s with args %s" % (fun, args))
|
|
if len(args) == 0:
|
|
return_value = fun()
|
|
elif len(args) == 1:
|
|
return_value = fun(args[0])
|
|
elif len(args) == 2:
|
|
return_value = fun(args[0], args[1])
|
|
elif len(args) == 3:
|
|
return_value = fun(args[0], args[1], args[2])
|
|
elif len(args) == 4:
|
|
return_value = fun(args[0], args[1], args[2], args[3])
|
|
elif len(args) == 5:
|
|
return_value = fun(args[0], args[1], args[2], args[3], args[4])
|
|
else:
|
|
raise Exception("too many arguments")
|
|
|
|
if return_first:
|
|
return return_value
|
|
result.append(return_value)
|
|
if return_first:
|
|
return None
|
|
return result
|