mirror of
https://github.com/flutter/flutter.git
synced 2026-02-20 02:29:02 +08:00
Refactor: move Task Queue to its own class (flutter/engine#9301)
* Refactor to move Task Queue to its own class - This is to help with sharing task queue among multiple message loops going forward. - currently there is 1:1 mapping between task queue and message loop, we are still maintaining the semantics for this change. * Add mutex include * Add unit tests for task queue * fix formatting * license
This commit is contained in:
parent
4b958d852b
commit
f751033171
@ -134,6 +134,9 @@ FILE: ../../../flutter/fml/message_loop.cc
|
||||
FILE: ../../../flutter/fml/message_loop.h
|
||||
FILE: ../../../flutter/fml/message_loop_impl.cc
|
||||
FILE: ../../../flutter/fml/message_loop_impl.h
|
||||
FILE: ../../../flutter/fml/message_loop_task_queue.cc
|
||||
FILE: ../../../flutter/fml/message_loop_task_queue.h
|
||||
FILE: ../../../flutter/fml/message_loop_task_queue_unittests.cc
|
||||
FILE: ../../../flutter/fml/message_loop_unittests.cc
|
||||
FILE: ../../../flutter/fml/message_unittests.cc
|
||||
FILE: ../../../flutter/fml/native_library.h
|
||||
|
||||
@ -46,6 +46,8 @@ source_set("fml") {
|
||||
"message_loop.h",
|
||||
"message_loop_impl.cc",
|
||||
"message_loop_impl.h",
|
||||
"message_loop_task_queue.cc",
|
||||
"message_loop_task_queue.h",
|
||||
"native_library.h",
|
||||
"paths.cc",
|
||||
"paths.h",
|
||||
@ -198,6 +200,7 @@ executable("fml_unittests") {
|
||||
"file_unittest.cc",
|
||||
"memory/ref_counted_unittest.cc",
|
||||
"memory/weak_ptr_unittest.cc",
|
||||
"message_loop_task_queue_unittests.cc",
|
||||
"message_loop_unittests.cc",
|
||||
"message_unittests.cc",
|
||||
"paths_unittests.cc",
|
||||
|
||||
@ -39,13 +39,22 @@ fml::RefPtr<MessageLoopImpl> MessageLoopImpl::Create() {
|
||||
#endif
|
||||
}
|
||||
|
||||
MessageLoopImpl::MessageLoopImpl() : order_(0), terminated_(false) {}
|
||||
MessageLoopImpl::MessageLoopImpl() : terminated_(false) {
|
||||
task_queue_ = std::make_unique<MessageLoopTaskQueue>();
|
||||
}
|
||||
|
||||
MessageLoopImpl::~MessageLoopImpl() = default;
|
||||
|
||||
void MessageLoopImpl::PostTask(fml::closure task, fml::TimePoint target_time) {
|
||||
FML_DCHECK(task != nullptr);
|
||||
RegisterTask(task, target_time);
|
||||
FML_DCHECK(task != nullptr);
|
||||
if (terminated_) {
|
||||
// If the message loop has already been terminated, PostTask should destruct
|
||||
// |task| synchronously within this function.
|
||||
return;
|
||||
}
|
||||
const auto wake_up = task_queue_->RegisterTask(task, target_time);
|
||||
WakeUp(wake_up);
|
||||
}
|
||||
|
||||
void MessageLoopImpl::AddTaskObserver(intptr_t key, fml::closure callback) {
|
||||
@ -53,16 +62,14 @@ void MessageLoopImpl::AddTaskObserver(intptr_t key, fml::closure callback) {
|
||||
FML_DCHECK(MessageLoop::GetCurrent().GetLoopImpl().get() == this)
|
||||
<< "Message loop task observer must be added on the same thread as the "
|
||||
"loop.";
|
||||
std::lock_guard<std::mutex> observers_lock(observers_mutex_);
|
||||
task_observers_[key] = std::move(callback);
|
||||
task_queue_->AddTaskObserver(key, callback);
|
||||
}
|
||||
|
||||
void MessageLoopImpl::RemoveTaskObserver(intptr_t key) {
|
||||
FML_DCHECK(MessageLoop::GetCurrent().GetLoopImpl().get() == this)
|
||||
<< "Message loop task observer must be removed from the same thread as "
|
||||
"the loop.";
|
||||
std::lock_guard<std::mutex> observers_lock(observers_mutex_);
|
||||
task_observers_.erase(key);
|
||||
task_queue_->RemoveTaskObserver(key);
|
||||
}
|
||||
|
||||
void MessageLoopImpl::DoRun() {
|
||||
@ -88,8 +95,7 @@ void MessageLoopImpl::DoRun() {
|
||||
// should be destructed on the message loop's thread. We have just returned
|
||||
// from the implementations |Run| method which we know is on the correct
|
||||
// thread. Drop all pending tasks on the floor.
|
||||
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
|
||||
delayed_tasks_ = {};
|
||||
task_queue_->Dispose();
|
||||
}
|
||||
|
||||
void MessageLoopImpl::DoTerminate() {
|
||||
@ -109,31 +115,8 @@ void MessageLoopImpl::SwapTaskQueues(const fml::RefPtr<MessageLoopImpl>& other)
|
||||
std::unique_lock<std::mutex> t2(other->tasks_flushing_mutex_,
|
||||
std::defer_lock);
|
||||
|
||||
// task_observers locks
|
||||
std::unique_lock<std::mutex> o1(observers_mutex_, std::defer_lock);
|
||||
std::unique_lock<std::mutex> o2(other->observers_mutex_, std::defer_lock);
|
||||
|
||||
// delayed_tasks locks
|
||||
std::unique_lock<std::mutex> d1(delayed_tasks_mutex_, std::defer_lock);
|
||||
std::unique_lock<std::mutex> d2(other->delayed_tasks_mutex_, std::defer_lock);
|
||||
|
||||
std::lock(t1, t2, o1, o2, d1, d2);
|
||||
|
||||
std::swap(task_observers_, other->task_observers_);
|
||||
std::swap(delayed_tasks_, other->delayed_tasks_);
|
||||
}
|
||||
|
||||
void MessageLoopImpl::RegisterTask(fml::closure task,
|
||||
fml::TimePoint target_time) {
|
||||
FML_DCHECK(task != nullptr);
|
||||
if (terminated_) {
|
||||
// If the message loop has already been terminated, PostTask should destruct
|
||||
// |task| synchronously within this function.
|
||||
return;
|
||||
}
|
||||
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
|
||||
delayed_tasks_.push({++order_, std::move(task), target_time});
|
||||
WakeUp(delayed_tasks_.top().GetTargetTime());
|
||||
std::lock(t1, t2);
|
||||
task_queue_->Swap(*other->task_queue_);
|
||||
}
|
||||
|
||||
void MessageLoopImpl::FlushTasks(FlushType type) {
|
||||
@ -148,36 +131,12 @@ void MessageLoopImpl::FlushTasks(FlushType type) {
|
||||
// will lead us to run invocations on the wrong thread.
|
||||
std::lock_guard<std::mutex> task_flush_lock(tasks_flushing_mutex_);
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
|
||||
|
||||
if (delayed_tasks_.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto now = fml::TimePoint::Now();
|
||||
while (!delayed_tasks_.empty()) {
|
||||
const auto& top = delayed_tasks_.top();
|
||||
if (top.GetTargetTime() > now) {
|
||||
break;
|
||||
}
|
||||
invocations.emplace_back(std::move(top.GetTask()));
|
||||
delayed_tasks_.pop();
|
||||
if (type == FlushType::kSingle) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
WakeUp(delayed_tasks_.empty() ? fml::TimePoint::Max()
|
||||
: delayed_tasks_.top().GetTargetTime());
|
||||
}
|
||||
const auto wake_up = task_queue_->GetTasksToRunNow(type, invocations);
|
||||
WakeUp(wake_up);
|
||||
|
||||
for (const auto& invocation : invocations) {
|
||||
invocation();
|
||||
std::lock_guard<std::mutex> observers_lock(observers_mutex_);
|
||||
for (const auto& observer : task_observers_) {
|
||||
observer.second();
|
||||
}
|
||||
task_queue_->NotifyObservers();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
#include "flutter/fml/macros.h"
|
||||
#include "flutter/fml/memory/ref_counted.h"
|
||||
#include "flutter/fml/message_loop.h"
|
||||
#include "flutter/fml/message_loop_task_queue.h"
|
||||
#include "flutter/fml/synchronization/thread_annotations.h"
|
||||
#include "flutter/fml/time/time_point.h"
|
||||
|
||||
@ -61,21 +62,9 @@ class MessageLoopImpl : public fml::RefCountedThreadSafe<MessageLoopImpl> {
|
||||
private:
|
||||
std::mutex tasks_flushing_mutex_;
|
||||
|
||||
std::mutex observers_mutex_;
|
||||
std::map<intptr_t, fml::closure> task_observers_
|
||||
FML_GUARDED_BY(observers_mutex_);
|
||||
|
||||
std::mutex delayed_tasks_mutex_;
|
||||
DelayedTaskQueue delayed_tasks_ FML_GUARDED_BY(delayed_tasks_mutex_);
|
||||
size_t order_ FML_GUARDED_BY(delayed_tasks_mutex_);
|
||||
std::unique_ptr<MessageLoopTaskQueue> task_queue_;
|
||||
std::atomic_bool terminated_;
|
||||
|
||||
void RegisterTask(fml::closure task, fml::TimePoint target_time);
|
||||
|
||||
enum class FlushType {
|
||||
kSingle,
|
||||
kAll,
|
||||
};
|
||||
void FlushTasks(FlushType type);
|
||||
|
||||
FML_DISALLOW_COPY_AND_ASSIGN(MessageLoopImpl);
|
||||
|
||||
97
engine/src/flutter/fml/message_loop_task_queue.cc
Normal file
97
engine/src/flutter/fml/message_loop_task_queue.cc
Normal file
@ -0,0 +1,97 @@
|
||||
// Copyright 2013 The Flutter Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file.
|
||||
|
||||
#define FML_USED_ON_EMBEDDER
|
||||
|
||||
#include "flutter/fml/message_loop_task_queue.h"
|
||||
|
||||
namespace fml {
|
||||
|
||||
MessageLoopTaskQueue::MessageLoopTaskQueue() : order_(0) {}
|
||||
|
||||
MessageLoopTaskQueue::~MessageLoopTaskQueue() = default;
|
||||
|
||||
void MessageLoopTaskQueue::Dispose() {
|
||||
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
|
||||
delayed_tasks_ = {};
|
||||
}
|
||||
|
||||
fml::TimePoint MessageLoopTaskQueue::RegisterTask(fml::closure task,
|
||||
fml::TimePoint target_time) {
|
||||
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
|
||||
delayed_tasks_.push({++order_, std::move(task), target_time});
|
||||
return delayed_tasks_.top().GetTargetTime();
|
||||
}
|
||||
|
||||
bool MessageLoopTaskQueue::HasPendingTasks() {
|
||||
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
|
||||
return !delayed_tasks_.empty();
|
||||
}
|
||||
|
||||
fml::TimePoint MessageLoopTaskQueue::GetTasksToRunNow(
|
||||
FlushType type,
|
||||
std::vector<fml::closure>& invocations) {
|
||||
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
|
||||
|
||||
const auto now = fml::TimePoint::Now();
|
||||
while (!delayed_tasks_.empty()) {
|
||||
const auto& top = delayed_tasks_.top();
|
||||
if (top.GetTargetTime() > now) {
|
||||
break;
|
||||
}
|
||||
invocations.emplace_back(std::move(top.GetTask()));
|
||||
delayed_tasks_.pop();
|
||||
if (type == FlushType::kSingle) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (delayed_tasks_.empty()) {
|
||||
return fml::TimePoint::Max();
|
||||
} else {
|
||||
return delayed_tasks_.top().GetTargetTime();
|
||||
}
|
||||
}
|
||||
|
||||
size_t MessageLoopTaskQueue::GetNumPendingTasks() {
|
||||
std::lock_guard<std::mutex> lock(delayed_tasks_mutex_);
|
||||
return delayed_tasks_.size();
|
||||
}
|
||||
|
||||
void MessageLoopTaskQueue::AddTaskObserver(intptr_t key,
|
||||
fml::closure callback) {
|
||||
std::lock_guard<std::mutex> observers_lock(observers_mutex_);
|
||||
task_observers_[key] = std::move(callback);
|
||||
}
|
||||
|
||||
void MessageLoopTaskQueue::RemoveTaskObserver(intptr_t key) {
|
||||
std::lock_guard<std::mutex> observers_lock(observers_mutex_);
|
||||
task_observers_.erase(key);
|
||||
}
|
||||
|
||||
void MessageLoopTaskQueue::NotifyObservers() {
|
||||
std::lock_guard<std::mutex> observers_lock(observers_mutex_);
|
||||
for (const auto& observer : task_observers_) {
|
||||
observer.second();
|
||||
}
|
||||
}
|
||||
|
||||
// Thread safety analysis disabled as it does not account for defered locks.
|
||||
void MessageLoopTaskQueue::Swap(MessageLoopTaskQueue& other)
|
||||
FML_NO_THREAD_SAFETY_ANALYSIS {
|
||||
// task_observers locks
|
||||
std::unique_lock<std::mutex> o1(observers_mutex_, std::defer_lock);
|
||||
std::unique_lock<std::mutex> o2(other.observers_mutex_, std::defer_lock);
|
||||
|
||||
// delayed_tasks locks
|
||||
std::unique_lock<std::mutex> d1(delayed_tasks_mutex_, std::defer_lock);
|
||||
std::unique_lock<std::mutex> d2(other.delayed_tasks_mutex_, std::defer_lock);
|
||||
|
||||
std::lock(o1, o2, d1, d2);
|
||||
|
||||
std::swap(task_observers_, other.task_observers_);
|
||||
std::swap(delayed_tasks_, other.delayed_tasks_);
|
||||
}
|
||||
|
||||
} // namespace fml
|
||||
75
engine/src/flutter/fml/message_loop_task_queue.h
Normal file
75
engine/src/flutter/fml/message_loop_task_queue.h
Normal file
@ -0,0 +1,75 @@
|
||||
// Copyright 2013 The Flutter Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file.
|
||||
|
||||
#ifndef FLUTTER_FML_MESSAGE_LOOP_TASK_QUEUE_H_
|
||||
#define FLUTTER_FML_MESSAGE_LOOP_TASK_QUEUE_H_
|
||||
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <vector>
|
||||
|
||||
#include "flutter/fml/closure.h"
|
||||
#include "flutter/fml/delayed_task.h"
|
||||
#include "flutter/fml/macros.h"
|
||||
#include "flutter/fml/memory/ref_counted.h"
|
||||
#include "flutter/fml/synchronization/thread_annotations.h"
|
||||
|
||||
namespace fml {
|
||||
|
||||
enum class FlushType {
|
||||
kSingle,
|
||||
kAll,
|
||||
};
|
||||
|
||||
// This class keeps track of all the tasks and observers that
|
||||
// need to be run on it's MessageLoopImpl.
|
||||
class MessageLoopTaskQueue {
|
||||
public:
|
||||
// Lifecycle.
|
||||
|
||||
MessageLoopTaskQueue();
|
||||
|
||||
~MessageLoopTaskQueue();
|
||||
|
||||
void Dispose();
|
||||
|
||||
// Tasks methods.
|
||||
|
||||
fml::TimePoint RegisterTask(fml::closure task, fml::TimePoint target_time);
|
||||
|
||||
bool HasPendingTasks();
|
||||
|
||||
// Returns the wake up time.
|
||||
fml::TimePoint GetTasksToRunNow(FlushType type,
|
||||
std::vector<fml::closure>& invocations);
|
||||
|
||||
size_t GetNumPendingTasks();
|
||||
|
||||
// Observers methods.
|
||||
|
||||
void AddTaskObserver(intptr_t key, fml::closure callback);
|
||||
|
||||
void RemoveTaskObserver(intptr_t key);
|
||||
|
||||
void NotifyObservers();
|
||||
|
||||
// Misc.
|
||||
|
||||
void Swap(MessageLoopTaskQueue& other);
|
||||
|
||||
private:
|
||||
std::mutex observers_mutex_;
|
||||
std::map<intptr_t, fml::closure> task_observers_
|
||||
FML_GUARDED_BY(observers_mutex_);
|
||||
|
||||
std::mutex delayed_tasks_mutex_;
|
||||
DelayedTaskQueue delayed_tasks_ FML_GUARDED_BY(delayed_tasks_mutex_);
|
||||
size_t order_ FML_GUARDED_BY(delayed_tasks_mutex_);
|
||||
|
||||
FML_DISALLOW_COPY_ASSIGN_AND_MOVE(MessageLoopTaskQueue);
|
||||
};
|
||||
|
||||
} // namespace fml
|
||||
|
||||
#endif // FLUTTER_FML_MESSAGE_LOOP_TASK_QUEUE_H_
|
||||
70
engine/src/flutter/fml/message_loop_task_queue_unittests.cc
Normal file
70
engine/src/flutter/fml/message_loop_task_queue_unittests.cc
Normal file
@ -0,0 +1,70 @@
|
||||
// Copyright 2013 The Flutter Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file.
|
||||
|
||||
#define FML_USED_ON_EMBEDDER
|
||||
|
||||
#include "flutter/fml/message_loop_task_queue.h"
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
TEST(MessageLoopTaskQueue, StartsWithNoPendingTasks) {
|
||||
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
|
||||
ASSERT_FALSE(task_queue->HasPendingTasks());
|
||||
}
|
||||
|
||||
TEST(MessageLoopTaskQueue, RegisterOneTask) {
|
||||
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
|
||||
const auto time = fml::TimePoint::Max();
|
||||
const auto wake_time = task_queue->RegisterTask([] {}, time);
|
||||
ASSERT_TRUE(task_queue->HasPendingTasks());
|
||||
ASSERT_TRUE(task_queue->GetNumPendingTasks() == 1);
|
||||
ASSERT_TRUE(wake_time == time);
|
||||
}
|
||||
|
||||
TEST(MessageLoopTaskQueue, RegisterTwoTasksAndCount) {
|
||||
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
|
||||
task_queue->RegisterTask([] {}, fml::TimePoint::Now());
|
||||
task_queue->RegisterTask([] {}, fml::TimePoint::Max());
|
||||
ASSERT_TRUE(task_queue->HasPendingTasks());
|
||||
ASSERT_TRUE(task_queue->GetNumPendingTasks() == 2);
|
||||
}
|
||||
|
||||
TEST(MessageLoopTaskQueue, PreserveTaskOrdering) {
|
||||
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
|
||||
int test_val = 0;
|
||||
|
||||
// order: 0
|
||||
task_queue->RegisterTask([&test_val]() { test_val = 1; },
|
||||
fml::TimePoint::Now());
|
||||
|
||||
// order: 1
|
||||
task_queue->RegisterTask([&test_val]() { test_val = 2; },
|
||||
fml::TimePoint::Now());
|
||||
|
||||
std::vector<fml::closure> invocations;
|
||||
task_queue->GetTasksToRunNow(fml::FlushType::kAll, invocations);
|
||||
|
||||
int expected_value = 1;
|
||||
|
||||
for (auto& invocation : invocations) {
|
||||
invocation();
|
||||
ASSERT_TRUE(test_val == expected_value);
|
||||
expected_value++;
|
||||
}
|
||||
}
|
||||
|
||||
TEST(MessageLoopTaskQueue, AddRemoveNotifyObservers) {
|
||||
auto task_queue = std::make_unique<fml::MessageLoopTaskQueue>();
|
||||
|
||||
int test_val = 0;
|
||||
intptr_t key = 123;
|
||||
|
||||
task_queue->AddTaskObserver(key, [&test_val]() { test_val = 1; });
|
||||
task_queue->NotifyObservers();
|
||||
ASSERT_TRUE(test_val == 1);
|
||||
|
||||
test_val = 0;
|
||||
task_queue->RemoveTaskObserver(key);
|
||||
task_queue->NotifyObservers();
|
||||
ASSERT_TRUE(test_val == 0);
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user