"Fossies" - the Fresh Open Source Software Archive

Member "flutter-1.22.4/packages/flutter_tools/lib/src/protocol_discovery.dart" (13 Nov 2020, 7176 Bytes) of package /linux/misc/flutter-1.22.4.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Dart source code syntax highlighting (style: standard) with prefixed line numbers and code folding option. Alternatively you can here view or download the uninterpreted source code file.

    1 // Copyright 2014 The Flutter Authors. All rights reserved.
    2 // Use of this source code is governed by a BSD-style license that can be
    3 // found in the LICENSE file.
    4 
    5 import 'dart:async';
    6 
    7 import 'package:meta/meta.dart';
    8 
    9 import 'base/io.dart';
   10 import 'device.dart';
   11 import 'globals.dart' as globals;
   12 
   13 /// Discovers a specific service protocol on a device, and forwards the service
   14 /// protocol device port to the host.
   15 class ProtocolDiscovery {
   16   ProtocolDiscovery._(
   17     this.logReader,
   18     this.serviceName, {
   19     this.portForwarder,
   20     this.throttleDuration,
   21     this.hostPort,
   22     this.devicePort,
   23     this.ipv6,
   24   }) : assert(logReader != null)
   25   {
   26     _deviceLogSubscription = logReader.logLines.listen(
   27       _handleLine,
   28       onDone: _stopScrapingLogs,
   29     );
   30     _uriStreamController = _BufferedStreamController<Uri>();
   31   }
   32 
   33   factory ProtocolDiscovery.observatory(
   34     DeviceLogReader logReader, {
   35     DevicePortForwarder portForwarder,
   36     Duration throttleDuration = const Duration(milliseconds: 200),
   37     @required int hostPort,
   38     @required int devicePort,
   39     @required bool ipv6,
   40   }) {
   41     const String kObservatoryService = 'Observatory';
   42     return ProtocolDiscovery._(
   43       logReader,
   44       kObservatoryService,
   45       portForwarder: portForwarder,
   46       throttleDuration: throttleDuration,
   47       hostPort: hostPort,
   48       devicePort: devicePort,
   49       ipv6: ipv6,
   50     );
   51   }
   52 
   53   final DeviceLogReader logReader;
   54   final String serviceName;
   55   final DevicePortForwarder portForwarder;
   56   final int hostPort;
   57   final int devicePort;
   58   final bool ipv6;
   59 
   60   /// The time to wait before forwarding a new observatory URIs from [logReader].
   61   final Duration throttleDuration;
   62 
   63   StreamSubscription<String> _deviceLogSubscription;
   64   _BufferedStreamController<Uri> _uriStreamController;
   65 
   66   /// The discovered service URL.
   67   ///
   68   /// Returns null if the log reader shuts down before any uri is found.
   69   ///
   70   /// Use [uris] instead.
   71   // TODO(egarciad): replace `uri` for `uris`.
   72   Future<Uri> get uri async {
   73     try {
   74       return await uris.first;
   75     } on StateError {
   76       return null;
   77     }
   78   }
   79 
   80   /// The discovered service URLs.
   81   ///
   82   /// When a new observatory URL: is available in [logReader],
   83   /// the URLs are forwarded at most once every [throttleDuration].
   84   ///
   85   /// Port forwarding is only attempted when this is invoked,
   86   /// for each observatory URL in the stream.
   87   Stream<Uri> get uris {
   88     return _uriStreamController.stream
   89       .transform(_throttle<Uri>(
   90         waitDuration: throttleDuration,
   91       ))
   92       .asyncMap<Uri>(_forwardPort);
   93   }
   94 
   95   Future<void> cancel() => _stopScrapingLogs();
   96 
   97   Future<void> _stopScrapingLogs() async {
   98     await _uriStreamController?.close();
   99     await _deviceLogSubscription?.cancel();
  100     _deviceLogSubscription = null;
  101   }
  102 
  103   Match _getPatternMatch(String line) {
  104     final RegExp r = RegExp(RegExp.escape(serviceName) + r' listening on ((http|//)[a-zA-Z0-9:/=_\-\.\[\]]+)');
  105     return r.firstMatch(line);
  106   }
  107 
  108   Uri _getObservatoryUri(String line) {
  109     final Match match = _getPatternMatch(line);
  110     if (match != null) {
  111       return Uri.parse(match[1]);
  112     }
  113     return null;
  114   }
  115 
  116   void _handleLine(String line) {
  117     Uri uri;
  118     try {
  119       uri = _getObservatoryUri(line);
  120     } on FormatException catch (error, stackTrace) {
  121       _uriStreamController.addError(error, stackTrace);
  122     }
  123     if (uri == null) {
  124       return;
  125     }
  126     if (devicePort != null && uri.port != devicePort) {
  127       globals.printTrace('skipping potential observatory $uri due to device port mismatch');
  128       return;
  129     }
  130     _uriStreamController.add(uri);
  131   }
  132 
  133   Future<Uri> _forwardPort(Uri deviceUri) async {
  134     globals.printTrace('$serviceName URL on device: $deviceUri');
  135     Uri hostUri = deviceUri;
  136 
  137     if (portForwarder != null) {
  138       final int actualDevicePort = deviceUri.port;
  139       final int actualHostPort = await portForwarder.forward(actualDevicePort, hostPort: hostPort);
  140       globals.printTrace('Forwarded host port $actualHostPort to device port $actualDevicePort for $serviceName');
  141       hostUri = deviceUri.replace(port: actualHostPort);
  142     }
  143 
  144     assert(InternetAddress(hostUri.host).isLoopback);
  145     if (ipv6) {
  146       hostUri = hostUri.replace(host: InternetAddress.loopbackIPv6.host);
  147     }
  148     return hostUri;
  149   }
  150 }
  151 
  152 /// Provides a broadcast stream controller that buffers the events
  153 /// if there isn't a listener attached.
  154 /// The events are then delivered when a listener is attached to the stream.
  155 class _BufferedStreamController<T> {
  156   _BufferedStreamController() : _events = <dynamic>[];
  157 
  158   /// The stream that this controller is controlling.
  159   Stream<T> get stream {
  160     return _streamController.stream;
  161   }
  162 
  163   StreamController<T> _streamControllerInstance;
  164 
  165   StreamController<T> get _streamController {
  166     _streamControllerInstance ??= StreamController<T>.broadcast(onListen: () {
  167       for (final dynamic event in _events) {
  168         assert(T is! List);
  169         if (event is T) {
  170           _streamControllerInstance.add(event);
  171         } else {
  172           _streamControllerInstance.addError(
  173             event.first as Object,
  174             event.last as StackTrace,
  175           );
  176         }
  177       }
  178       _events.clear();
  179     });
  180     return _streamControllerInstance;
  181   }
  182 
  183   final List<dynamic> _events;
  184 
  185   /// Sends [event] if there is a listener attached to the broadcast stream.
  186   /// Otherwise, it enqueues [event] until a listener is attached.
  187   void add(T event) {
  188     if (_streamController.hasListener) {
  189       _streamController.add(event);
  190     } else {
  191       _events.add(event);
  192     }
  193   }
  194 
  195   /// Sends or enqueues an error event.
  196   void addError(Object error, [StackTrace stackTrace]) {
  197     if (_streamController.hasListener) {
  198       _streamController.addError(error, stackTrace);
  199     } else {
  200       _events.add(<dynamic>[error, stackTrace]);
  201     }
  202   }
  203 
  204   /// Closes the stream.
  205   Future<void> close() {
  206     return _streamController.close();
  207   }
  208 }
  209 
  210 /// This transformer will produce an event at most once every [waitDuration].
  211 ///
  212 /// For example, consider a `waitDuration` of `10ms`, and list of event names
  213 /// and arrival times: `a (0ms), b (5ms), c (11ms), d (21ms)`.
  214 /// The events `c` and `d` will be produced as a result.
  215 StreamTransformer<S, S> _throttle<S>({
  216   @required Duration waitDuration,
  217 }) {
  218   assert(waitDuration != null);
  219 
  220   S latestLine;
  221   int lastExecution;
  222   Future<void> throttleFuture;
  223 
  224   return StreamTransformer<S, S>
  225     .fromHandlers(
  226       handleData: (S value, EventSink<S> sink) {
  227         latestLine = value;
  228 
  229         final int currentTime = DateTime.now().millisecondsSinceEpoch;
  230         lastExecution ??= currentTime;
  231         final int remainingTime = currentTime - lastExecution;
  232         final int nextExecutionTime = remainingTime > waitDuration.inMilliseconds
  233           ? 0
  234           : waitDuration.inMilliseconds - remainingTime;
  235 
  236         throttleFuture ??= Future<void>
  237           .delayed(Duration(milliseconds: nextExecutionTime))
  238           .whenComplete(() {
  239             sink.add(latestLine);
  240             throttleFuture = null;
  241             lastExecution = DateTime.now().millisecondsSinceEpoch;
  242           });
  243       }
  244     );
  245 }