# Copyright 2014 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import unittest # pylint: disable=F0401 from mojo_bindings import promise class PromiseTest(unittest.TestCase): def setUp(self): self.accumulated = [] def _AddToAccumulated(self, res): self.accumulated.append(res) return res def testResolve(self): p = promise.Promise.Resolve(0) self.assertEquals(p.state, promise.Promise.STATE_FULLFILLED) p.Then(self._AddToAccumulated) self.assertEquals(self.accumulated, [0]) def testResolveToPromise(self): p = promise.Promise.Resolve(0) self.assertEquals(p.state, promise.Promise.STATE_FULLFILLED) q = promise.Promise.Resolve(p) self.assertEquals(p.state, promise.Promise.STATE_FULLFILLED) q.Then(self._AddToAccumulated) self.assertEquals(self.accumulated, [0]) def testReject(self): p = promise.Promise.Reject(0) self.assertEquals(p.state, promise.Promise.STATE_REJECTED) p.Then(onRejected=self._AddToAccumulated) self.assertEquals(self.accumulated, [0]) def testGeneratorFunctionResolve(self): (p, resolve, _) = _GetPromiseAndFunctions() self.assertEquals(p.state, promise.Promise.STATE_PENDING) p.Then(self._AddToAccumulated) resolve(0) self.assertEquals(p.state, promise.Promise.STATE_FULLFILLED) self.assertEquals(self.accumulated, [0]) def testGeneratorFunctionReject(self): (p, _, reject) = _GetPromiseAndFunctions() self.assertEquals(p.state, promise.Promise.STATE_PENDING) p.Then(None, self._AddToAccumulated) reject(0) self.assertEquals(p.state, promise.Promise.STATE_REJECTED) self.assertEquals(self.accumulated, [0]) def testGeneratorFunctionResolveToPromise(self): (p1, resolve, _) = _GetPromiseAndFunctions() p2 = promise.Promise(lambda x, y: x(p1)) self.assertEquals(p2.state, promise.Promise.STATE_PENDING) p2.Then(self._AddToAccumulated) resolve(promise.Promise.Resolve(0)) self.assertEquals(self.accumulated, [0]) def testComputation(self): (p, resolve, _) = _GetPromiseAndFunctions() p.Then(lambda x: x+1).Then(lambda x: x+2).Then(self._AddToAccumulated) self.assertEquals(self.accumulated, []) resolve(0) self.assertEquals(self.accumulated, [3]) def testRecoverAfterException(self): (p, resolve, _) = _GetPromiseAndFunctions() q = p.Then(_ThrowException).Catch(self._AddToAccumulated) self.assertEquals(self.accumulated, []) resolve(0) self.assertEquals(q.state, promise.Promise.STATE_FULLFILLED) self.assertEquals(len(self.accumulated), 1) self.assertIsInstance(self.accumulated[0], RuntimeError) self.assertEquals(self.accumulated[0].message, 0) def testMultipleRejectResolve(self): (p, resolve, reject) = _GetPromiseAndFunctions() p.Then(self._AddToAccumulated, self._AddToAccumulated) resolve(0) self.assertEquals(self.accumulated, [0]) resolve(0) self.assertEquals(self.accumulated, [0]) reject(0) self.assertEquals(self.accumulated, [0]) self.accumulated = [] (p, resolve, reject) = _GetPromiseAndFunctions() p.Then(self._AddToAccumulated, self._AddToAccumulated) reject(0) self.assertEquals(self.accumulated, [0]) resolve(0) self.assertEquals(self.accumulated, [0]) reject(0) self.assertEquals(self.accumulated, [0]) def testAll(self): promises_and_functions = [_GetPromiseAndFunctions() for x in xrange(10)] promises = [x[0] for x in promises_and_functions] all_promise = promise.Promise.All(*promises) res = [] def AddToRes(values): res.append(values) all_promise.Then(AddToRes, AddToRes) for i, (_, resolve, _) in enumerate(promises_and_functions): self.assertEquals(len(res), 0) resolve(i) self.assertEquals(len(res), 1) self.assertEquals(res[0], [i for i in xrange(10)]) self.assertEquals(all_promise.state, promise.Promise.STATE_FULLFILLED) def testAllFailure(self): promises_and_functions = [_GetPromiseAndFunctions() for x in xrange(10)] promises = [x[0] for x in promises_and_functions] all_promise = promise.Promise.All(*promises) res = [] def AddToRes(values): res.append(values) all_promise.Then(AddToRes, AddToRes) for i in xrange(10): if i <= 5: self.assertEquals(len(res), 0) else: self.assertEquals(len(res), 1) if i != 5: promises_and_functions[i][1](i) else: promises_and_functions[i][2]('error') self.assertEquals(len(res), 1) self.assertEquals(res[0], 'error') self.assertEquals(all_promise.state, promise.Promise.STATE_REJECTED) def testRace(self): promises_and_functions = [_GetPromiseAndFunctions() for x in xrange(10)] promises = [x[0] for x in promises_and_functions] race_promise = promise.Promise.Race(*promises) res = [] def AddToRes(values): res.append(values) race_promise.Then(AddToRes, AddToRes) self.assertEquals(len(res), 0) promises_and_functions[7][1]('success') self.assertEquals(len(res), 1) for i, (f) in enumerate(promises_and_functions): f[1 + (i % 2)](i) self.assertEquals(len(res), 1) self.assertEquals(res[0], 'success') self.assertEquals(race_promise.state, promise.Promise.STATE_FULLFILLED) def testRaceFailure(self): promises_and_functions = [_GetPromiseAndFunctions() for x in xrange(10)] promises = [x[0] for x in promises_and_functions] race_promise = promise.Promise.Race(*promises) res = [] def AddToRes(values): res.append(values) race_promise.Then(AddToRes, AddToRes) self.assertEquals(len(res), 0) promises_and_functions[7][2]('error') self.assertEquals(len(res), 1) for i, (f) in enumerate(promises_and_functions): f[1 + (i % 2)](i) self.assertEquals(len(res), 1) self.assertEquals(res[0], 'error') self.assertEquals(race_promise.state, promise.Promise.STATE_REJECTED) def testAsync(self): @promise.async def ComputeAdd(*values, **kwvalues): return sum(values) + sum(kwvalues.values()) res = [] def AddToRes(values): res.append(values) # Simple test. ComputeAdd(1, 2, foo=3).Then(AddToRes) self.assertEquals(res, [6]) # Resolve promises res = [] promises_and_functions = [_GetPromiseAndFunctions() for x in xrange(10)] promises = [x[0] for x in promises_and_functions] dict_promises = dict(zip(map(str, xrange(10)), promises)) add_promise = ComputeAdd(*promises, **dict_promises).Then(AddToRes) self.assertEquals(len(res), 0) self.assertEquals(add_promise.state, promise.Promise.STATE_PENDING) for _, r, _ in promises_and_functions: r(1) self.assertEquals(res, [20]) self.assertEquals(add_promise.state, promise.Promise.STATE_FULLFILLED) # Fail promise res = [] promises_and_functions = [_GetPromiseAndFunctions() for x in xrange(10)] promises = [x[0] for x in promises_and_functions] add_promise = ComputeAdd(*promises).Then(AddToRes).Catch(AddToRes) self.assertEquals(len(res), 0) self.assertEquals(add_promise.state, promise.Promise.STATE_PENDING) promises_and_functions[7][2]('error') self.assertEquals(res, ['error']) self.assertEquals(add_promise.state, promise.Promise.STATE_FULLFILLED) def testAttributeGetter(self): class MyObject(object): def __init__(self): self.value = 0 def GetValue(self, value=None): return value p = promise.Promise.Resolve(MyObject()) res = [] def AddToRes(values): res.append(values) p.value.Then(AddToRes) p.GetValue(promise.Promise.Resolve(1)).Then(AddToRes) self.assertEquals(res, [0, 1]) res = [] p.GetTwo().Catch(AddToRes) self.assertEquals(len(res), 1) def _GetPromiseAndFunctions(): functions = {} def GeneratorFunction(resolve, reject): functions['resolve'] = resolve functions['reject'] = reject p = promise.Promise(GeneratorFunction) return (p, functions['resolve'], functions['reject']) def _ThrowException(x): raise RuntimeError(x)