Source code for scoop.shared
#!/usr/bin/env python
#
# This file is part of Scalable COncurrent Operations in Python (SCOOP).
#
# SCOOP is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of
# the License, or (at your option) any later version.
#
# SCOOP is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with SCOOP. If not, see <http://www.gnu.org/licenses/>.
#
import itertools
from inspect import ismethod
from functools import reduce
import time
from . import encapsulation, utils
import scoop
from .fallbacks import ensureScoopStartedProperly, NotStartedProperly
elements = None
def _ensureAtomicity(fn):
"""Ensure atomicity of passed elements on the whole worker pool"""
@ensureScoopStartedProperly
def wrapper(*args, **kwargs):
"""setConst(**kwargs)
Set a constant that will be shared to every workers.
This call blocks until the constant has propagated to at least one
worker.
:param \*\*kwargs: One or more combination(s) key=value. Key being the
variable name and value the object to share.
:returns: None.
Usage: setConst(name=value)
"""
# Note that the docstring is the one of setConst.
# This is because of the documentation framework (sphinx) limitations.
from . import _control
# Enforce retrieval of currently awaiting constants
_control.execQueue.socket.pumpInfoSocket()
for key, value in kwargs.items():
# Object name existence check
if key in itertools.chain(*(elem.keys() for elem in elements.values())):
raise TypeError("This constant already exists: {0}.".format(key))
# Retry element propagation until it is returned
while all(key in elements.get(scoop.worker, []) for key in kwargs.keys()) is not True:
scoop.logger.debug("Sending global variables {0}...".format(
list(kwargs.keys())
))
# Call the function
fn(*args, **kwargs)
# Enforce retrieval of currently awaiting constants
_control.execQueue.socket.pumpInfoSocket()
# TODO: Make previous blocking instead of sleep
time.sleep(0.1)
# Atomicity check
elementNames = list(itertools.chain(*(elem.keys() for elem in elements.values())))
if len(elementNames) != len(set(elementNames)):
raise TypeError("This constant already exists: {0}.".format(key))
return wrapper
@_ensureAtomicity
def setConst(**kwargs):
"""setConst(**kwargs)
Set a constant that will be shared to every workers.
:param **kwargs: One or more combination(s) key=value. Key being the
variable name and value the object to share.
:returns: None.
Usage: setConst(name=value)
"""
from . import _control
sendVariable = _control.execQueue.socket.sendVariable
for key, value in kwargs.items():
# Propagate the constant
# for file-like objects, see encapsulation.py where copyreg was
# used to overload standard pickling.
if callable(value):
sendVariable(key, encapsulation.FunctionEncapsulation(value, key))
else:
sendVariable(key, value)
[docs]def getConst(name, timeout=0.1):
"""Get a shared constant.
:param name: The name of the shared variable to retrieve.
:param timeout: The maximum time to wait in seconds for the propagation of
the constant.
:returns: The shared object.
Usage: value = getConst('name')
"""
from . import _control
import time
timeStamp = time.time()
while True:
# Enforce retrieval of currently awaiting constants
_control.execQueue.socket.pumpInfoSocket()
# Constants concatenation
constants = dict(reduce(
lambda x, y: x + list(y.items()),
elements.values(),
[]
))
timeoutHappened = time.time() - timeStamp > timeout
if constants.get(name) is not None or timeoutHappened:
return constants.get(name)
time.sleep(0.01)
[docs]class SharedElementEncapsulation(object):
"""Encapsulates a reference to an element available in the shared module.
This is used by Futures (map on lambda, for instance)."""
def __init__(self, element):
self.isMethod = False
if utils.isStr(element):
# Already shared element
assert getConst(element, timeout=0) != None, (
"Element must already be shared."
)
self.uniqueID = element
else:
# Element to share
# Determine if function is a method. Methods derived from external
# languages such as C++ aren't detected by ismethod.
if ismethod(element):
# Must share whole object before ability to use its method
self.isMethod = True
self.methodName = element.__name__
element = element.__self__
# Lambda-like or unshared code to share
uniqueID = str(scoop.worker) + str(id(element)) + str(hash(element))
self.uniqueID = uniqueID
if getConst(uniqueID, timeout=0) == None:
funcRef = {uniqueID: element}
setConst(**funcRef)
def __repr__(self):
return self.uniqueID
def __call__(self, *args, **kwargs):
if self.isMethod:
wholeObj = getConst(
self.__repr__(),
timeout=float("inf"),
)
return getattr(wholeObj, self.methodName)(*args, **kwargs)
else:
return getConst(self.__repr__(),
timeout=float("inf"))(*args, **kwargs)
def __name__(self):
return self.__repr__()