Source code for scoop.shared

#!/usr/bin/env python
#
#    This file is part of Scalable COncurrent Operations in Python (SCOOP). 
#
#    SCOOP is free software: you can redistribute it and/or modify
#    it under the terms of the GNU Lesser General Public License as
#    published by the Free Software Foundation, either version 3 of
#    the License, or (at your option) any later version.
#
#    SCOOP is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#    GNU Lesser General Public License for more details.
#
#    You should have received a copy of the GNU Lesser General Public
#    License along with SCOOP. If not, see <http://www.gnu.org/licenses/>.
#

import itertools
from functools import reduce
from . import encapsulation
import time

elements = None


def _ensureAtomicity(fn):
    """Ensure atomicity of passed elements on the whole worker pool"""
    def wrapper(*args, **kwargs):
        """setConst(**kwargs)
        Set a constant that will be shared to every workers.

        :param \*\*kwargs: One or more combination(s) key=value. Key being the
            variable name and value the object to share.

        :returns: None.

        Usage: setConst(name=value)
        """
        # Note that the docstring is the one of setConst. This is because of
        # sphinx.

        # TODO: Import that elsewhere
        from . import _control

        # Enforce retrieval of currently awaiting constants
        _control.execQueue.socket.pumpInfoSocket()

        for key, value in kwargs.items():
            # Object name existence check
            if key in itertools.chain(*(elem.keys() for elem in elements.values())):
                raise TypeError("This constant already exists: {0}.".format(key))
            
        # Call the function
        fn(*args, **kwargs)

        # Wait for element propagation
        import time
        import scoop
        while all(key in elements[scoop.worker] for key in kwargs.keys()) is not True:
            # Enforce retrieval of currently awaiting constants
            _control.execQueue.socket.pumpInfoSocket()
            # TODO: Make previous blocking instead of sleep
            time.sleep(0.1)

        # Atomicity check
        elementNames = list(itertools.chain(*(elem.keys() for elem in elements.values())))
        if len(elementNames) != len(set(elementNames)):
            raise TypeError("This constant already exists: {0}.".format(key))

    return wrapper


@_ensureAtomicity
[docs]def setConst(**kwargs): """setConst(**kwargs) Set a constant that will be shared to every workers. :param **kwargs: One or more combination(s) key=value. Key being the variable name and value the object to share. :returns: None. Usage: setConst(name=value) """ # TODO: Import that elsewhere from . import _control sendVariable = _control.execQueue.socket.sendVariable for key, value in kwargs.items(): # Propagate the constant if hasattr(value, '__code__'): sendVariable(key, encapsulation.FunctionEncapsulation(value)) # TODO: file-like objects with encapsulation.ExternalEncapsulation else: sendVariable(key, value)
[docs]def getConst(name, timeout=0.1): """Get a constant that was shared beforehand. :param name: The name of the shared variable to retrieve. :param timeout: The maximum time to wait in seconds for the propagation of the variable. :returns: The shared object. Usage: value = getConst('name') """ # TODO: Import that elsewhere from . import _control import time timeStamp = time.time() while True: # Enforce retrieval of currently awaiting constants _control.execQueue.socket.pumpInfoSocket() # Constants concatenation constants = dict(reduce(lambda x, y: x + list(y.items()), elements.values(), [])) timeoutHappened = time.time() - timeStamp > timeout if constants.get(name) is not None or timeoutHappened: return constants.get(name) time.sleep(0.01)

Manual

Back to Welcome