Dart: Streams

Carlos Costa

Stream é uma sequência de eventos assíncronos. Ou seja, é uma sequência de dados que são emitidos ao longo do tempo.

Exemplo básico:

var stm = Stream.fromIterable([1, 2, 3]);

stm.listen((event) {
  print(event);
});

// output: 1 2 3

Nesse exemplo, criamos um stream a partir de uma lista de inteiros e escutamos os eventos que são emitidos por ele. O método listen recebe uma função que será executada a cada evento emitido pelo stream.

Transformações


É possível transformar os valores emitidos por um stream antes de escutá-los

var stm = Stream.fromIterable([1, 2, 3, 4, 5, 6]);

stm
.map((event) => event * 2) // primeira transformação
.where((event) => event % 2 == 0) // segunda transformação
.listen((event) {
  print(event);
});

// output: 2 4 6

Aqui nesse exemplo, criamos um stream a partir de uma lista de inteiros e aplicamos duas transformações:

  • A primeira transformação multiplica cada valor emitido pelo stream por 2.
  • A segunda transformação filtra os valores emitidos pelo stream, retornando apenas os valores pares.

É possível aplicar várias transformações com os métodos: map, where, expand, take, skip, takeWhile, skipWhile, distinct, debounce, etc..

Referencia completa: Stream Class

Stream.periodic


var stm = Stream.periodic(Duration(seconds: 1), (event) => event);

stm.listen((event) {
  print(event);
});

// output: 0 1 2 3 4 5 6 7 8 9 ...

O método periodic cria um stream que emite um evento a cada período de tempo.

Stream.fromFuture


var stm = Stream.fromFuture(Future.delayed(Duration(seconds: 1), () => 1));

stm.listen((event) {
  print(event);
});

// output: 1

O método fromFuture cria uma stream que emite um evento quando a future é resolvida. No nosso exemplo, a future é resolvida após 1 segundo e a stream emite o valor 1.

Stream.fromFutures


var stm = Stream.fromFutures([
  Future.delayed(Duration(seconds: 1), () => 1),
  Future.delayed(Duration(seconds: 2), () => 2),
  Future.delayed(Duration(seconds: 3), () => 3),
]);

stm.listen((event) {
  print(event);
});

O método fromFutures functiona de forma semelhante ao fromFuture, porém recebe uma lista de futures e emite um evento para cada future resolvida.

Stream e Generators


Generators são funções especiais que podem produzir umas sequências de valores. essas funções pode ser sincronas e retornar um Iterable ou assíncronas e retornar um Stream.

Exemplo:

Iterable<int> itr() sync* {
  yield 1;
  yield 2;
  yield 3;
}

itr().forEach((element) {
  print(element);
});

// output: 1 2 3

Nesse Exemplo criamos um generator que retorna um Iterable de inteiros, e iteramos sobre ele com o método forEach.

Combinando generators com streams:

Stream<int> stm() async* {
  for (var i = 0; i < 10; i++) {
    yield i;
  }
}

stm().listen((event) {
  print(event);
});

// output: 0 1 2 3 4 5 6 7 8 9

A função que retorna um stream é marcada com a palavra-chave async* e dentro dela usamos a palavra-chave yield para retornar os valores.

Resumindo:

  • sync*: retorna um Iterable
  • async*: retorna um Stream

StreamController


StreamController é um objeto que pode ser usado para criar e controlar um stream.

Através do controller é possível adicionar eventos ao stream e escutar os eventos emitidos.

Exemplo:

var stm = StreamController<int>();

stm.stream.listen((event) {
  print(event);
});

stm.sink.add(1);
stm.sink.add(2);
stm.sink.add(3);

// output: 1 2 3

No Exemplo anterior, criamos um streamController e adicionamos 3 eventos.

Os eventos também podem ser adicionados de forma periódica:

var stm = StreamController<int>();

stm.stream.listen((event) {
  print(event);
});

Timer.periodic(Duration(seconds: 1), (timer) {
  stm.sink.add(timer.tick);
});

//output: 1 2 3 4 5...

Subscriptions


Subscriptions são objetos que representam uma conexão entre um stream e um listener.

Através de um subscription é possível cancelar a escuta de um stream.

Exemplo:

var stm = Stream.periodic(Duration(seconds: 1), (event) => event);

var subscription = stm.listen((event) {
  print(event);
});

Timer(Duration(seconds: 5), () {
  subscription.cancel();
});

// output: 0 1 2 3 4

Nesse exemplo, criamos um stream que emite um evento a cada segundo e escutamos os eventos emitidos. Após 5 segundos cancelamos a escuta.

Broadcast


Broadcast é um tipo de stream que permite que vários listeners escutem os eventos emitidos.

Por padrão os streams são single-subscription, ou seja, apenas um listener pode escutar os eventos emitidos.

var stm = Stream
  .periodic(Duration(seconds: 1), (event) => event)

stm.listen((event) {
  print('listener 1: $event');
});

stm.listen((event) {
  print('listener 2: $event');
});

Executando esse exemplo teremos os seguinte error: 👇

Unhandled exception:
Bad state: Stream has already been listened to.

Usando o método asBroadcastStream podemos transformar um stream single-subscription em um stream broadcast, dessa forma podemos escutar os eventos emitidos em vários listeners.

var stm = Stream
  .periodic(Duration(seconds: 1), (event) => event)
  .asBroadcastStream();

stm.listen((event) {
  print('listener 1: $event');
});

stm.listen((event) {
  print('listener 2: $event');
});

// output:
// listener 1: 0
// listener 2: 0
// listener 1: 1
// listener 2: 1
// listener 1: 2
// listener 2: 2 ...

StreamTransformer


StreamTransformer é um objecto que pode ser usado para transformar um stream.

Exemplo:

var stm = Stream
  .periodic(Duration(seconds: 1), (event) => event)
  .asBroadcastStream();

var transformer = StreamTransformer<int, String>.fromHandlers(
  handleData: (data, sink) {
    sink.add('value: $data');
  },
);

stm.transform(transformer).listen((event) {
  print(event);
});

// output: value: 0 value: 1 value: 2 value: 3 ...

Nesse exemplo, criamos um stream que emite um evento a cada segundo e transformamos os valores emitidos em strings.

Lendo arquivos com Streams


import 'dart:convert';
import 'dart:io';
import 'dart:math';

createFile() {
  // criando arquivo
  var file = File('random.txt');

  // criando um objeto randomico
  var random = Random.secure();

  // criando um objeto que escreve no arquivo
  var sink = file.openWrite();

  // escrevendo no arquivo
  for (var i = 0; i < 100; i++) {
    var values = List<int>.generate(10, (i) => random.nextInt(255));
    var hrash = base64UrlEncode(values);

    sink.write(hrash + '\n');
  }

  // fechando o arquivo
  sink.close();
}

readFile() {
  // criando um objeto file
  var file = File('random.txt');

  // criando um objeto stream
  // por padrão o método openRead() retorna um objeto Stream<List<int>>
  var stream = file.openRead();

  // criando um objeto que lê o stream
  var lines = stream.transform(utf8.decoder).transform(LineSplitter());

  var lineNumber = 0;

  //print a line with index
  lines.listen((line) {
    print('${lineNumber++}: $line');
  });
}

void main() {
  createFile();
  readFile();
}

Nesse exemplo, criamos um arquivo com 100 linhas de strings aleatórias e lemos o arquivo linha por linha.

o método transform() recebe um objeto StreamTransformer.

o método LineSplitter() é um StreamTransformer que transforma um stream de String em um stream de linhas.

o objeto utf8.decoder é um StreamTransformer que transforma um stream de bytes em um stream de String.

Executando o código do exemplo teremos a seguinte saída:

0: EeiYFJSvOycnwA==
1: tojGQMrPzmOI5A==
2: uJi0hk4TDlueMA==
3: iHLTOi-BBuIW3w==
4: hlweYxCyPw5r9A==
5: n0RvyFOOcJAnUg==
6: Er8PkAYeGZmUbw==
7: z9kweXDS-M6dIA==
8: O9L2aYB788bEeQ==
9: Ai4Ea-ZJ9MgLtA==
10: 0fHJXvzSvN-MhA==
...

Referências


Para entender melhor e de forma geral o conceito de streams visite os seguintes links: