Source code for ivy.plugin.parallel_plugin_collection

# IVY is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# IVY is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with IVY.  If not, see <http://www.gnu.org/licenses/>.


'''
Created on Mar 18, 2014

author: jakeret
'''
from __future__ import print_function, division, absolute_import
from ivy.plugin.base_plugin import BasePlugin
from ivy.loop import Loop
from ivy import context
from ivy.plugin.plugin_factory import PluginFactory
from ivy.exceptions.exceptions import InvalidAttributeException
from ivy import backend


[docs]class ParallelPluginCollection(BasePlugin): """ Collection that allows for executing plugins in parallel by using a MapReduce aprach. The implementation therefore requires a list of plugins to execute, a map plugin creating the workload and (optionally) a reduce plugin reducing the data from the parallel task exection :param pluginList: List of plugins (or a Loop) which should be executed in parallel :param mapPlugin: :param reducePlugin: (optional) :param ctx: (optional) """ def __init__(self, pluginList, mapPlugin, reducePlugin=None, ctx=None, parallel=True): ''' Constructor ''' if ctx is None: ctx = context.ctx() self.ctx = ctx super(ParallelPluginCollection, self).__init__(self.ctx) if not isinstance(pluginList, Loop): pluginList = Loop(pluginList) self.pluginList = pluginList if mapPlugin is None: raise InvalidAttributeException("No map plugin provided") self.mapPlugin = mapPlugin self.reducePlugin = reducePlugin self.parallel = parallel def __str__(self): return "ParallelPluginCollection" def __call__(self): force = None if not self.parallel: force = "sequential" backendImpl = backend.create(self.ctx, force) mapPlugin = self.mapPlugin if isinstance(self.mapPlugin, basestring): mapPlugin = PluginFactory.createInstance(mapPlugin, self.ctx) ctxList = backendImpl.run(self.pluginList, mapPlugin) if self.reducePlugin is not None: reducePlugin = self.reducePlugin if isinstance(self.reducePlugin, basestring): reducePlugin = PluginFactory.createInstance(reducePlugin, self.ctx) reducePlugin.reduce(ctxList)