mirror of
https://github.com/flutter/flutter.git
synced 2026-02-20 02:29:02 +08:00
97 lines
2.8 KiB
Python
97 lines
2.8 KiB
Python
# Copyright 2015 The Chromium Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
import random
|
|
|
|
import mojo_unittest
|
|
from mojo_bindings import promise
|
|
|
|
# pylint: disable=F0401
|
|
import mojo_system as system
|
|
|
|
# pylint: disable=F0401
|
|
from mojo_utils import data_pipe_utils
|
|
|
|
|
|
def _GetRandomBuffer(size):
|
|
random.seed(size)
|
|
return bytearray(''.join(chr(random.randint(0, 255)) for i in xrange(size)))
|
|
|
|
|
|
class DataPipeCopyTest(mojo_unittest.MojoTestCase):
|
|
def setUp(self):
|
|
super(DataPipeCopyTest, self).setUp()
|
|
self.handles = system.DataPipe()
|
|
self.error = None
|
|
|
|
def tearDown(self):
|
|
self.handles = None
|
|
super(DataPipeCopyTest, self).tearDown()
|
|
|
|
def _writeDataAndClose(self, handle, data):
|
|
status, num_bytes_written = handle.WriteData(data)
|
|
handle.Close()
|
|
self.assertEquals(system.RESULT_OK, status)
|
|
self.assertEquals(len(data), num_bytes_written)
|
|
|
|
def _copyDataFromPipe(self, handle, expected_data,
|
|
deadline=system.DEADLINE_INDEFINITE):
|
|
self._VerifyDataCopied(data_pipe_utils.CopyFromDataPipe(
|
|
handle, deadline), expected_data).Catch(self._CatchError)
|
|
|
|
def _CatchError(self, error):
|
|
if self.loop:
|
|
self.loop.Quit()
|
|
self.error = error
|
|
|
|
@promise.async
|
|
def _VerifyDataCopied(self, data, expected_data):
|
|
self.assertEquals(expected_data, data)
|
|
self.loop.Quit()
|
|
|
|
def _runAndCheckError(self):
|
|
self.loop.Run()
|
|
if self.error:
|
|
# pylint: disable=E0702
|
|
raise self.error
|
|
|
|
def _testEagerWrite(self, data):
|
|
self._writeDataAndClose(self.handles.producer_handle, data)
|
|
self._copyDataFromPipe(self.handles.consumer_handle, data)
|
|
self._runAndCheckError()
|
|
|
|
def _testDelayedWrite(self, data):
|
|
self._copyDataFromPipe(self.handles.consumer_handle, data)
|
|
self._writeDataAndClose(self.handles.producer_handle, data)
|
|
self._runAndCheckError()
|
|
|
|
def testTimeout(self):
|
|
self._copyDataFromPipe(self.handles.consumer_handle, bytearray(),
|
|
deadline=100)
|
|
with self.assertRaises(data_pipe_utils.DataPipeCopyException):
|
|
self._runAndCheckError()
|
|
|
|
def testCloseProducerWithoutWriting(self):
|
|
self._copyDataFromPipe(self.handles.consumer_handle, bytearray())
|
|
self.handles.producer_handle.Close()
|
|
self._runAndCheckError()
|
|
|
|
def testEagerWriteOfEmptyData(self):
|
|
self._testEagerWrite(bytearray())
|
|
|
|
def testDelayedWriteOfEmptyData(self):
|
|
self._testDelayedWrite(bytearray())
|
|
|
|
def testEagerWriteOfNonEmptyData(self):
|
|
self._testEagerWrite(_GetRandomBuffer(1024))
|
|
|
|
def testDelayedWriteOfNonEmptyData(self):
|
|
self._testDelayedWrite(_GetRandomBuffer(1024))
|
|
|
|
def testEagerWriteOfLargeBuffer(self):
|
|
self._testEagerWrite(_GetRandomBuffer(32 * 1024))
|
|
|
|
def testDelayedWriteOfLargeBuffer(self):
|
|
self._testDelayedWrite(_GetRandomBuffer(32 * 1024))
|