Resize channel buffers (#12402)

Made it so you can resize channel buffers by sending messages to them.
This commit is contained in:
gaaclarke 2019-10-01 10:31:25 -07:00 committed by GitHub
parent 438db22e7c
commit f407e06970
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 112 additions and 11 deletions

View File

@ -112,6 +112,8 @@ class ChannelBuffers {
/// buffer size that will avoid any overflows.
static const int kDefaultBufferSize = 1;
static const String kControlChannelName = 'dev.flutter/channel-buffers';
/// A mapping between a channel name and its associated [_RingBuffer].
final Map<String, _RingBuffer<_StoredMessage>> _messages =
<String, _RingBuffer<_StoredMessage>>{};
@ -162,7 +164,7 @@ class ChannelBuffers {
///
/// This could result in the dropping of messages if newSize is less
/// than the current length of the queue.
void resize(String channel, int newSize) {
void _resize(String channel, int newSize) {
_RingBuffer<_StoredMessage> queue = _messages[channel];
if (queue == null) {
queue = _makeRingBuffer(newSize);
@ -185,6 +187,30 @@ class ChannelBuffers {
await callback(message.data, message.callback);
}
}
String _getString(ByteData data) {
final ByteBuffer buffer = data.buffer;
final Uint8List list = buffer.asUint8List(data.offsetInBytes, data.lengthInBytes);
return utf8.decode(list);
}
/// Handle a control message.
///
/// This is intended to be called by the platform messages dispatcher.
///
/// Available messages:
/// - Name: resize
/// Arity: 2
/// Format: `resize\r<channel name>\r<new size>`
/// Description: Allows you to set the size of a channel's buffer.
void handleMessage(ByteData data) {
final List<String> command = _getString(data).split('\r');
if (command.length == /*arity=*/2 + 1 && command[0] == 'resize') {
_resize(command[1], int.parse(command[2]));
} else {
throw Exception('Unrecognized command $command sent to $kControlChannelName.');
}
}
}
/// [ChannelBuffer]s that allow the storage of messages between the

View File

@ -150,7 +150,15 @@ void _updateAccessibilityFeatures(int values) {
@pragma('vm:entry-point')
void _dispatchPlatformMessage(String name, ByteData data, int responseId) {
if (window.onPlatformMessage != null) {
if (name == ChannelBuffers.kControlChannelName) {
try {
channelBuffers.handleMessage(data);
} catch (ex) {
_printDebug('Message to "$name" caused exception $ex');
} finally {
window._respondToPlatformMessage(responseId, null);
}
} else if (window.onPlatformMessage != null) {
_invoke3<String, ByteData, PlatformMessageResponseCallback>(
window.onPlatformMessage,
window._onPlatformMessageZone,

View File

@ -16,8 +16,8 @@ class ChannelBuffers {
return true;
}
/// Noop in web_ui, caches are always size zero.
void resize(String channel, int newSize) {}
/// A noop since the web_ui implementation doesn't handle any messages.
void handleMessage(ByteData data) {}
/// Remove and process all stored messages for a given channel.
///

View File

@ -9,6 +9,8 @@ import android.support.annotation.Nullable;
import android.support.annotation.UiThread;
import android.util.Log;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Locale;
import io.flutter.BuildConfig;
import io.flutter.plugin.common.BinaryMessenger.BinaryReply;
@ -29,6 +31,8 @@ import io.flutter.plugin.common.BinaryMessenger.BinaryMessageHandler;
*/
public final class BasicMessageChannel<T> {
private static final String TAG = "BasicMessageChannel#";
public static final String CHANNEL_BUFFERS_CHANNEL =
"dev.flutter/channel-buffers";
@NonNull
private final BinaryMessenger messenger;
@ -102,6 +106,18 @@ public final class BasicMessageChannel<T> {
handler == null ? null : new IncomingMessageHandler(handler));
}
/**
* Adjusts the number of messages that will get buffered when sending messages to
* channels that aren't fully setup yet. For example, the engine isn't running
* yet or the channel's message handler isn't setup on the Dart side yet.
*/
public void resizeChannelBuffer(int newSize) {
Charset charset = Charset.forName("UTF-8");
String messageString = String.format(Locale.US, "resize\r%s\r%d", name, newSize);
ByteBuffer message = ByteBuffer.wrap(messageString.getBytes(charset));
messenger.send(CHANNEL_BUFFERS_CHANNEL, message);
}
/**
* A handler of incoming messages.
*/

View File

@ -114,6 +114,14 @@ FLUTTER_EXPORT
* @param handler The message handler.
*/
- (void)setMessageHandler:(FlutterMessageHandler _Nullable)handler;
/**
* Adjusts the number of messages that will get buffered when sending messages to
* channels that aren't fully setup yet. For example, the engine isn't running
* yet or the channel's message handler isn't setup on the Dart side yet.
*/
- (void)resizeChannelBuffer:(NSInteger)newSize;
@end
/**

View File

@ -6,6 +6,8 @@
#pragma mark - Basic message channel
static NSString* const FlutterChannelBuffersChannel = @"dev.flutter/channel-buffers";
@implementation FlutterBasicMessageChannel {
NSObject<FlutterBinaryMessenger>* _messenger;
NSString* _name;
@ -70,6 +72,13 @@
};
[_messenger setMessageHandlerOnChannel:_name binaryMessageHandler:messageHandler];
}
- (void)resizeChannelBuffer:(NSInteger)newSize {
NSString* messageString = [NSString stringWithFormat:@"resize\r%@\r%@", _name, @(newSize)];
NSData* message = [messageString dataUsingEncoding:NSUTF8StringEncoding];
[_messenger sendOnChannel:FlutterChannelBuffersChannel message:message];
}
@end
#pragma mark - Method channel

View File

@ -139,4 +139,22 @@
[self waitForExpectationsWithTimeout:1.0 handler:nil];
}
- (void)testResize {
NSString* channelName = @"foo";
id binaryMessenger = OCMStrictProtocolMock(@protocol(FlutterBinaryMessenger));
id codec = OCMProtocolMock(@protocol(FlutterMethodCodec));
FlutterBasicMessageChannel* channel =
[[FlutterBasicMessageChannel alloc] initWithName:channelName
binaryMessenger:binaryMessenger
codec:codec];
XCTAssertNotNil(channel);
NSString* expectedMessageString =
[NSString stringWithFormat:@"resize\r%@\r%@", channelName, @100];
NSData* expectedMessage = [expectedMessageString dataUsingEncoding:NSUTF8StringEncoding];
OCMExpect([binaryMessenger sendOnChannel:@"dev.flutter/channel-buffers" message:expectedMessage]);
[channel resizeChannelBuffer:100];
OCMVerifyAll(binaryMessenger);
}
@end

View File

@ -18,6 +18,10 @@ void main() {
return utf8.decode(list);
}
void _resize(ui.ChannelBuffers buffers, String name, int newSize) {
buffers.handleMessage(_makeByteData("resize\r$name\r$newSize"));
}
test('push drain', () async {
String channel = "foo";
ByteData data = _makeByteData('bar');
@ -35,7 +39,7 @@ void main() {
ByteData data = _makeByteData('bar');
ui.ChannelBuffers buffers = ui.ChannelBuffers();
ui.PlatformMessageResponseCallback callback = (ByteData responseData) {};
buffers.resize(channel, 0);
_resize(buffers, channel, 0);
buffers.push(channel, data, callback);
bool didCall = false;
await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) {
@ -64,7 +68,7 @@ void main() {
ByteData four = _makeByteData('four');
ui.ChannelBuffers buffers = ui.ChannelBuffers();
ui.PlatformMessageResponseCallback callback = (ByteData responseData) {};
buffers.resize(channel, 3);
_resize(buffers, channel, 3);
expect(buffers.push(channel, one, callback), equals(false));
expect(buffers.push(channel, two, callback), equals(false));
expect(buffers.push(channel, three, callback), equals(false));
@ -84,11 +88,11 @@ void main() {
ByteData one = _makeByteData('one');
ByteData two = _makeByteData('two');
ui.ChannelBuffers buffers = ui.ChannelBuffers();
buffers.resize(channel, 100);
_resize(buffers, channel, 100);
ui.PlatformMessageResponseCallback callback = (ByteData responseData) {};
expect(buffers.push(channel, one, callback), equals(false));
expect(buffers.push(channel, two, callback), equals(false));
buffers.resize(channel, 1);
_resize(buffers, channel, 1);
int counter = 0;
await buffers.drain(channel, (ByteData drainedData, ui.PlatformMessageResponseCallback drainedCallback) {
if (counter++ == 0) {
@ -109,10 +113,10 @@ void main() {
didCallCallback = true;
};
ui.PlatformMessageResponseCallback twoCallback = (ByteData responseData) {};
buffers.resize(channel, 100);
_resize(buffers, channel, 100);
expect(buffers.push(channel, one, oneCallback), equals(false));
expect(buffers.push(channel, two, twoCallback), equals(false));
buffers.resize(channel, 1);
_resize(buffers, channel, 1);
expect(didCallCallback, equals(true));
});
@ -126,9 +130,21 @@ void main() {
didCallCallback = true;
};
ui.PlatformMessageResponseCallback twoCallback = (ByteData responseData) {};
buffers.resize(channel, 1);
_resize(buffers, channel, 1);
expect(buffers.push(channel, one, oneCallback), equals(false));
expect(buffers.push(channel, two, twoCallback), equals(true));
expect(didCallCallback, equals(true));
});
test('handle garbage', () async {
ui.ChannelBuffers buffers = ui.ChannelBuffers();
expect(() => buffers.handleMessage(_makeByteData("asdfasdf")),
throwsException);
});
test('handle resize garbage', () async {
ui.ChannelBuffers buffers = ui.ChannelBuffers();
expect(() => buffers.handleMessage(_makeByteData("resize\rfoo\rbar")),
throwsException);
});
}