// Copyright 2013 The Flutter Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #define FML_USED_ON_EMBEDDER #include #include #include "flutter/fml/message_loop.h" #include "flutter/fml/synchronization/count_down_latch.h" #include "flutter/fml/synchronization/waitable_event.h" #include "flutter/fml/task_runner.h" #include "gtest/gtest.h" #define TIME_SENSITIVE(x) TimeSensitiveTest_##x #if OS_WIN #define PLATFORM_SPECIFIC_CAPTURE(...) [ __VA_ARGS__, count ] #else #define PLATFORM_SPECIFIC_CAPTURE(...) [__VA_ARGS__] #endif TEST(MessageLoop, GetCurrent) { std::thread thread([]() { fml::MessageLoop::EnsureInitializedForCurrentThread(); ASSERT_TRUE(fml::MessageLoop::GetCurrent().GetTaskRunner()); }); thread.join(); } TEST(MessageLoop, DifferentThreadsHaveDifferentLoops) { fml::MessageLoop* loop1 = nullptr; fml::AutoResetWaitableEvent latch1; fml::AutoResetWaitableEvent term1; std::thread thread1([&loop1, &latch1, &term1]() { fml::MessageLoop::EnsureInitializedForCurrentThread(); loop1 = &fml::MessageLoop::GetCurrent(); latch1.Signal(); term1.Wait(); }); fml::MessageLoop* loop2 = nullptr; fml::AutoResetWaitableEvent latch2; fml::AutoResetWaitableEvent term2; std::thread thread2([&loop2, &latch2, &term2]() { fml::MessageLoop::EnsureInitializedForCurrentThread(); loop2 = &fml::MessageLoop::GetCurrent(); latch2.Signal(); term2.Wait(); }); latch1.Wait(); latch2.Wait(); ASSERT_FALSE(loop1 == loop2); term1.Signal(); term2.Signal(); thread1.join(); thread2.join(); } TEST(MessageLoop, CanRunAndTerminate) { bool started = false; bool terminated = false; std::thread thread([&started, &terminated]() { fml::MessageLoop::EnsureInitializedForCurrentThread(); auto& loop = fml::MessageLoop::GetCurrent(); ASSERT_TRUE(loop.GetTaskRunner()); loop.GetTaskRunner()->PostTask([&terminated]() { fml::MessageLoop::GetCurrent().Terminate(); terminated = true; }); loop.Run(); started = true; }); thread.join(); ASSERT_TRUE(started); ASSERT_TRUE(terminated); } TEST(MessageLoop, NonDelayedTasksAreRunInOrder) { const size_t count = 100; bool started = false; bool terminated = false; std::thread thread([&started, &terminated, count]() { fml::MessageLoop::EnsureInitializedForCurrentThread(); auto& loop = fml::MessageLoop::GetCurrent(); size_t current = 0; for (size_t i = 0; i < count; i++) { loop.GetTaskRunner()->PostTask( PLATFORM_SPECIFIC_CAPTURE(&terminated, i, ¤t)() { ASSERT_EQ(current, i); current++; if (count == i + 1) { fml::MessageLoop::GetCurrent().Terminate(); terminated = true; } }); } loop.Run(); ASSERT_EQ(current, count); started = true; }); thread.join(); ASSERT_TRUE(started); ASSERT_TRUE(terminated); } TEST(MessageLoop, DelayedTasksAtSameTimeAreRunInOrder) { const size_t count = 100; bool started = false; bool terminated = false; std::thread thread([&started, &terminated, count]() { fml::MessageLoop::EnsureInitializedForCurrentThread(); auto& loop = fml::MessageLoop::GetCurrent(); size_t current = 0; const auto now_plus_some = fml::TimePoint::Now() + fml::TimeDelta::FromMilliseconds(2); for (size_t i = 0; i < count; i++) { loop.GetTaskRunner()->PostTaskForTime( PLATFORM_SPECIFIC_CAPTURE(&terminated, i, ¤t)() { ASSERT_EQ(current, i); current++; if (count == i + 1) { fml::MessageLoop::GetCurrent().Terminate(); terminated = true; } }, now_plus_some); } loop.Run(); ASSERT_EQ(current, count); started = true; }); thread.join(); ASSERT_TRUE(started); ASSERT_TRUE(terminated); } TEST(MessageLoop, CheckRunsTaskOnCurrentThread) { fml::RefPtr runner; fml::AutoResetWaitableEvent latch; std::thread thread([&runner, &latch]() { fml::MessageLoop::EnsureInitializedForCurrentThread(); auto& loop = fml::MessageLoop::GetCurrent(); runner = loop.GetTaskRunner(); latch.Signal(); ASSERT_TRUE(loop.GetTaskRunner()->RunsTasksOnCurrentThread()); }); latch.Wait(); ASSERT_TRUE(runner); ASSERT_FALSE(runner->RunsTasksOnCurrentThread()); thread.join(); } TEST(MessageLoop, TIME_SENSITIVE(SingleDelayedTaskByDelta)) { bool checked = false; std::thread thread([&checked]() { fml::MessageLoop::EnsureInitializedForCurrentThread(); auto& loop = fml::MessageLoop::GetCurrent(); auto begin = fml::TimePoint::Now(); loop.GetTaskRunner()->PostDelayedTask( [begin, &checked]() { auto delta = fml::TimePoint::Now() - begin; auto ms = delta.ToMillisecondsF(); ASSERT_GE(ms, 3); ASSERT_LE(ms, 7); checked = true; fml::MessageLoop::GetCurrent().Terminate(); }, fml::TimeDelta::FromMilliseconds(5)); loop.Run(); }); thread.join(); ASSERT_TRUE(checked); } TEST(MessageLoop, TIME_SENSITIVE(SingleDelayedTaskForTime)) { bool checked = false; std::thread thread([&checked]() { fml::MessageLoop::EnsureInitializedForCurrentThread(); auto& loop = fml::MessageLoop::GetCurrent(); auto begin = fml::TimePoint::Now(); loop.GetTaskRunner()->PostTaskForTime( [begin, &checked]() { auto delta = fml::TimePoint::Now() - begin; auto ms = delta.ToMillisecondsF(); ASSERT_GE(ms, 3); ASSERT_LE(ms, 7); checked = true; fml::MessageLoop::GetCurrent().Terminate(); }, fml::TimePoint::Now() + fml::TimeDelta::FromMilliseconds(5)); loop.Run(); }); thread.join(); ASSERT_TRUE(checked); } TEST(MessageLoop, TIME_SENSITIVE(MultipleDelayedTasksWithIncreasingDeltas)) { const auto count = 10; int checked = false; std::thread thread(PLATFORM_SPECIFIC_CAPTURE(&checked)() { fml::MessageLoop::EnsureInitializedForCurrentThread(); auto& loop = fml::MessageLoop::GetCurrent(); for (int target_ms = 0 + 2; target_ms < count + 2; target_ms++) { auto begin = fml::TimePoint::Now(); loop.GetTaskRunner()->PostDelayedTask( PLATFORM_SPECIFIC_CAPTURE(begin, target_ms, &checked)() { auto delta = fml::TimePoint::Now() - begin; auto ms = delta.ToMillisecondsF(); ASSERT_GE(ms, target_ms - 2); ASSERT_LE(ms, target_ms + 2); checked++; if (checked == count) { fml::MessageLoop::GetCurrent().Terminate(); } }, fml::TimeDelta::FromMilliseconds(target_ms)); } loop.Run(); }); thread.join(); ASSERT_EQ(checked, count); } TEST(MessageLoop, TIME_SENSITIVE(MultipleDelayedTasksWithDecreasingDeltas)) { const auto count = 10; int checked = false; std::thread thread(PLATFORM_SPECIFIC_CAPTURE(&checked)() { fml::MessageLoop::EnsureInitializedForCurrentThread(); auto& loop = fml::MessageLoop::GetCurrent(); for (int target_ms = count + 2; target_ms > 0 + 2; target_ms--) { auto begin = fml::TimePoint::Now(); loop.GetTaskRunner()->PostDelayedTask( PLATFORM_SPECIFIC_CAPTURE(begin, target_ms, &checked)() { auto delta = fml::TimePoint::Now() - begin; auto ms = delta.ToMillisecondsF(); ASSERT_GE(ms, target_ms - 2); ASSERT_LE(ms, target_ms + 2); checked++; if (checked == count) { fml::MessageLoop::GetCurrent().Terminate(); } }, fml::TimeDelta::FromMilliseconds(target_ms)); } loop.Run(); }); thread.join(); ASSERT_EQ(checked, count); } TEST(MessageLoop, TaskObserverFire) { bool started = false; bool terminated = false; std::thread thread([&started, &terminated]() { fml::MessageLoop::EnsureInitializedForCurrentThread(); const size_t count = 25; auto& loop = fml::MessageLoop::GetCurrent(); size_t task_count = 0; size_t obs_count = 0; auto obs = PLATFORM_SPECIFIC_CAPTURE(&obs_count)() { obs_count++; }; for (size_t i = 0; i < count; i++) { loop.GetTaskRunner()->PostTask( PLATFORM_SPECIFIC_CAPTURE(&terminated, i, &task_count)() { ASSERT_EQ(task_count, i); task_count++; if (count == i + 1) { fml::MessageLoop::GetCurrent().Terminate(); terminated = true; } }); } loop.AddTaskObserver(0, obs); loop.Run(); ASSERT_EQ(task_count, count); ASSERT_EQ(obs_count, count); started = true; }); thread.join(); ASSERT_TRUE(started); ASSERT_TRUE(terminated); } TEST(MessageLoop, CanCreateConcurrentMessageLoop) { fml::MessageLoop loop(fml::MessageLoop::Type::kConcurrent); auto task_runner = loop.GetTaskRunner(); const size_t kCount = 10; fml::CountDownLatch latch(kCount); for (size_t i = 0; i < kCount; ++i) { task_runner->PostTask([&latch]() { std::this_thread::sleep_for(std::chrono::milliseconds(5)); std::cout << "Ran on thread: " << std::this_thread::get_id() << std::endl; latch.CountDown(); }); } latch.Wait(); } TEST(MessageLoop, CanSwapMessageLoopsAndPreserveThreadConfiguration) { // synchronization notes: // 1. term1 and term2 are to wait for Swap. // 2. task_started_1 is to wait for the task runners // to signal that they are done. // 3. loop_init_1 and loop_init_2 are to wait for the message loops to // get initialized. fml::MessageLoop* loop1 = nullptr; fml::AutoResetWaitableEvent loop_init_1; fml::AutoResetWaitableEvent task_started_1; fml::AutoResetWaitableEvent term1; std::thread thread1([&loop1, &loop_init_1, &term1, &task_started_1]() { fml::MessageLoop::EnsureInitializedForCurrentThread(); loop1 = &fml::MessageLoop::GetCurrent(); // this task will be run on thread1 after Swap. loop1->GetTaskRunner()->PostTask([&task_started_1]() { task_started_1.Signal(); fml::MessageLoop::GetCurrent().Terminate(); }); loop_init_1.Signal(); term1.Wait(); loop1->Run(); }); loop_init_1.Wait(); fml::MessageLoop* loop2 = nullptr; fml::AutoResetWaitableEvent loop_init_2; fml::AutoResetWaitableEvent task_started_2; fml::AutoResetWaitableEvent term2; std::thread thread2( [&loop2, &loop_init_2, &term2, &task_started_2, &loop1]() { fml::MessageLoop::EnsureInitializedForCurrentThread(); loop2 = &fml::MessageLoop::GetCurrent(); // this task will be run on thread1 after Swap. loop2->GetTaskRunner()->PostTask([&task_started_2, &loop1]() { // ensure that we run the task on loop1 after the swap. ASSERT_TRUE(loop1 == &fml::MessageLoop::GetCurrent()); task_started_2.Signal(); fml::MessageLoop::GetCurrent().Terminate(); }); loop_init_2.Signal(); term2.Wait(); loop2->Run(); }); loop_init_2.Wait(); // swap the loops. loop1->SwapTaskQueues(loop2); // thread_1 should wait for tr_term2 latch. term1.Signal(); task_started_2.Wait(); // thread_2 should wait for tr_term2 latch. term2.Signal(); task_started_1.Wait(); thread1.join(); thread2.join(); } TEST(MessageLoop, TIME_SENSITIVE(DelayedTaskSwap)) { // Task execution order: // time (ms): 0 10 20 30 40 // thread 1: A1 A2 A3 A4 TERM // thread 2: B1 B2 B3 TERM // At time 15, we swap thread 1 and 2, and assert // that tasks run on the right threads. std::thread::id t1, t2; fml::AutoResetWaitableEvent tid_1, tid_2; fml::MessageLoop* loop1 = nullptr; fml::MessageLoop* loop2 = nullptr; std::thread thread_1([&loop1, &t1, &t2, &tid_1, &tid_2]() { t1 = std::this_thread::get_id(); tid_1.Signal(); tid_2.Wait(); fml::MessageLoop::EnsureInitializedForCurrentThread(); loop1 = &fml::MessageLoop::GetCurrent(); for (int t = 0; t <= 4; t++) { loop1->GetTaskRunner()->PostDelayedTask( [t, &t1, &t2]() { auto cur_tid = std::this_thread::get_id(); if (t <= 1) { ASSERT_EQ(cur_tid, t1); } else { ASSERT_EQ(cur_tid, t2); } if (t == 4) { fml::MessageLoop::GetCurrent().Terminate(); } }, fml::TimeDelta::FromMilliseconds(t * 10)); } loop1->Run(); }); std::thread thread_2([&loop2, &t1, &t2, &tid_1, &tid_2]() { t2 = std::this_thread::get_id(); tid_2.Signal(); tid_1.Wait(); fml::MessageLoop::EnsureInitializedForCurrentThread(); loop2 = &fml::MessageLoop::GetCurrent(); for (int t = 1; t <= 4; t++) { loop2->GetTaskRunner()->PostDelayedTask( [t, &t1, &t2]() { auto cur_tid = std::this_thread::get_id(); if (t <= 1) { ASSERT_EQ(cur_tid, t2); } else { ASSERT_EQ(cur_tid, t1); } if (t == 4) { fml::MessageLoop::GetCurrent().Terminate(); } }, fml::TimeDelta::FromMilliseconds(t * 10)); } loop2->Run(); }); // on main thread we swap the threads at 15 ms. std::this_thread::sleep_for(std::chrono::milliseconds(15)); loop1->SwapTaskQueues(loop2); thread_1.join(); thread_2.join(); }