mirror of
https://github.com/flutter/flutter.git
synced 2026-02-20 02:29:02 +08:00
132 lines
4.4 KiB
Python
Executable File
132 lines
4.4 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# Copyright 2015 The Chromium Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""A host test module demonstrating interacting with remote subprocesses."""
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
import xmlrpclib
|
|
|
|
# Map the testing directory so we can import legion.legion_test.
|
|
TESTING_DIR = os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)),
|
|
'..', '..', '..', '..', 'testing')
|
|
sys.path.append(TESTING_DIR)
|
|
|
|
from legion import legion_test_case
|
|
from legion import jsonrpclib
|
|
|
|
|
|
class ExampleTestController(legion_test_case.TestCase):
|
|
"""An example controller using the remote subprocess functions."""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
"""Creates the task machine and waits until it connects."""
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--task-hash')
|
|
parser.add_argument('--os', default='Ubuntu-14.04')
|
|
args, _ = parser.parse_known_args()
|
|
|
|
cls.task = cls.CreateTask(
|
|
isolated_hash=args.task_hash,
|
|
dimensions={'os': args.os},
|
|
idle_timeout_secs=90,
|
|
connection_timeout_secs=90,
|
|
verbosity=logging.DEBUG)
|
|
cls.task.Create()
|
|
cls.task.WaitForConnection()
|
|
|
|
def testMultipleProcesses(self):
|
|
"""Tests that processes can be run and controlled simultaneously."""
|
|
start = time.time()
|
|
logging.info('Starting "sleep 10" and "sleep 20"')
|
|
sleep10 = self.task.Process(['sleep', '10'])
|
|
sleep20 = self.task.Process(['sleep', '20'])
|
|
|
|
logging.info('Waiting for sleep 10 to finish and verifying timing')
|
|
sleep10.Wait()
|
|
elapsed = time.time() - start
|
|
self.assertGreaterEqual(elapsed, 10)
|
|
self.assertLess(elapsed, 11)
|
|
|
|
logging.info('Waiting for sleep 20 to finish and verifying timing')
|
|
sleep20.Wait()
|
|
elapsed = time.time() - start
|
|
self.assertGreaterEqual(elapsed, 20)
|
|
|
|
sleep10.Delete()
|
|
sleep20.Delete()
|
|
|
|
def testTerminate(self):
|
|
"""Tests that a process can be correctly terminated."""
|
|
start = time.time()
|
|
|
|
logging.info('Starting "sleep 20"')
|
|
sleep20 = self.task.Process(['sleep', '20'])
|
|
logging.info('Calling Terminate()')
|
|
sleep20.Terminate()
|
|
try:
|
|
logging.info('Trying to wait for sleep 20 to complete')
|
|
sleep20.Wait()
|
|
except xmlrpclib.Fault:
|
|
pass
|
|
finally:
|
|
sleep20.Delete()
|
|
logging.info('Checking to make sure sleep 20 was actually terminated')
|
|
self.assertLess(time.time() - start, 20)
|
|
|
|
def testLs(self):
|
|
"""Tests that the returned results from a process are correct."""
|
|
logging.info('Calling "ls"')
|
|
ls = self.task.Process(['ls'])
|
|
logging.info('Trying to wait for ls to complete')
|
|
ls.Wait()
|
|
logging.info('Checking that ls completed and returned the correct results')
|
|
self.assertEqual(ls.GetReturncode(), 0)
|
|
self.assertIn('task.isolate', ls.ReadStdout())
|
|
|
|
def testProcessOutput(self):
|
|
"""Tests that a process's output gets logged to a file in the output-dir."""
|
|
code = ('import sys\n'
|
|
'sys.stdout.write("Hello stdout")\n'
|
|
'sys.stderr.write("Hello stderr")')
|
|
self.task.rpc.WriteFile('test.py', code)
|
|
proc = self.task.Process(['python', 'test.py'],)
|
|
|
|
self.CheckProcessOutput('stdout', proc.key, 'Hello stdout')
|
|
self.CheckProcessOutput('stderr', proc.key, 'Hello stderr')
|
|
|
|
def testCustomKey(self):
|
|
"""Tests that a custom key passed to a process works correctly."""
|
|
code = ('import sys\n'
|
|
'sys.stdout.write("Hello CustomKey stdout")\n'
|
|
'sys.stderr.write("Hello CustomKey stderr")')
|
|
self.task.rpc.WriteFile('test.py', code)
|
|
self.task.Process(['python', 'test.py'], key='CustomKey')
|
|
|
|
self.CheckProcessOutput('stdout', 'CustomKey', 'Hello CustomKey stdout')
|
|
self.CheckProcessOutput('stderr', 'CustomKey', 'Hello CustomKey stderr')
|
|
|
|
def testKeyReuse(self):
|
|
"""Tests that a key cannot be reused."""
|
|
self.task.Process(['ls'], key='KeyReuse')
|
|
self.assertRaises(jsonrpclib.Fault, self.task.Process, ['ls'],
|
|
key='KeyReuse')
|
|
|
|
def CheckProcessOutput(self, pipe, key, expected):
|
|
"""Checks that a process' output files are correct."""
|
|
logging.info('Reading output file')
|
|
output_dir = self.task.rpc.GetOutputDir()
|
|
path = self.task.rpc.PathJoin(output_dir, '%s.%s' % (key, pipe))
|
|
actual = self.task.rpc.ReadFile(path)
|
|
self.assertEqual(expected, actual)
|
|
|
|
if __name__ == '__main__':
|
|
legion_test_case.main()
|