Dart: Streams
Stream é uma sequência de eventos assíncronos. Ou seja, é uma sequência de dados que são emitidos ao longo do tempo.
Exemplo básico:
var stm = Stream.fromIterable([1, 2, 3]);
stm.listen((event) {
print(event);
});
// output: 1 2 3
Nesse exemplo, criamos um stream a partir de uma lista de inteiros e escutamos
os eventos que são emitidos por ele. O método listen
recebe uma função que
será executada a cada evento emitido pelo stream.
Transformações
É possível transformar os valores emitidos por um stream antes de escutá-los
var stm = Stream.fromIterable([1, 2, 3, 4, 5, 6]);
stm
.map((event) => event * 2) // primeira transformação
.where((event) => event % 2 == 0) // segunda transformação
.listen((event) {
print(event);
});
// output: 2 4 6
Aqui nesse exemplo, criamos um stream a partir de uma lista de inteiros e aplicamos duas transformações:
- A primeira transformação multiplica cada valor emitido pelo stream por 2.
- A segunda transformação filtra os valores emitidos pelo stream, retornando apenas os valores pares.
É possível aplicar várias transformações com os métodos:
map
, where
, expand
, take
, skip
, takeWhile
, skipWhile
, distinct
, debounce
, etc..
Referencia completa: Stream Class
Stream.periodic
var stm = Stream.periodic(Duration(seconds: 1), (event) => event);
stm.listen((event) {
print(event);
});
// output: 0 1 2 3 4 5 6 7 8 9 ...
O método periodic
cria um stream que emite um evento a cada período de tempo.
Stream.fromFuture
var stm = Stream.fromFuture(Future.delayed(Duration(seconds: 1), () => 1));
stm.listen((event) {
print(event);
});
// output: 1
O método fromFuture
cria uma stream que emite um evento quando a future é
resolvida. No nosso exemplo, a future é resolvida após 1 segundo e a stream emite o valor 1.
Stream.fromFutures
var stm = Stream.fromFutures([
Future.delayed(Duration(seconds: 1), () => 1),
Future.delayed(Duration(seconds: 2), () => 2),
Future.delayed(Duration(seconds: 3), () => 3),
]);
stm.listen((event) {
print(event);
});
O método fromFutures
functiona de forma semelhante ao fromFuture
, porém recebe uma lista de futures e emite um evento para cada future resolvida.
Stream e Generators
Generators são funções especiais que podem produzir umas sequências de valores. essas funções pode ser sincronas e retornar um Iterable ou assíncronas e retornar um Stream.
Exemplo:
Iterable<int> itr() sync* {
yield 1;
yield 2;
yield 3;
}
itr().forEach((element) {
print(element);
});
// output: 1 2 3
Nesse Exemplo criamos um generator que retorna um Iterable
de inteiros, e
iteramos sobre ele com o método forEach
.
Combinando generators com streams:
Stream<int> stm() async* {
for (var i = 0; i < 10; i++) {
yield i;
}
}
stm().listen((event) {
print(event);
});
// output: 0 1 2 3 4 5 6 7 8 9
A função que retorna um stream é marcada com a palavra-chave async*
e dentro dela usamos a palavra-chave yield
para retornar os valores.
Resumindo:
sync*
: retorna um Iterableasync*
: retorna um Stream
StreamController
StreamController é um objeto que pode ser usado para criar e controlar um stream.
Através do controller é possível adicionar eventos ao stream e escutar os eventos emitidos.
Exemplo:
var stm = StreamController<int>();
stm.stream.listen((event) {
print(event);
});
stm.sink.add(1);
stm.sink.add(2);
stm.sink.add(3);
// output: 1 2 3
No Exemplo anterior, criamos um streamController e adicionamos 3 eventos.
Os eventos também podem ser adicionados de forma periódica:
var stm = StreamController<int>();
stm.stream.listen((event) {
print(event);
});
Timer.periodic(Duration(seconds: 1), (timer) {
stm.sink.add(timer.tick);
});
//output: 1 2 3 4 5...
Subscriptions
Subscriptions são objetos que representam uma conexão entre um stream e um listener.
Através de um subscription é possível cancelar a escuta de um stream.
Exemplo:
var stm = Stream.periodic(Duration(seconds: 1), (event) => event);
var subscription = stm.listen((event) {
print(event);
});
Timer(Duration(seconds: 5), () {
subscription.cancel();
});
// output: 0 1 2 3 4
Nesse exemplo, criamos um stream que emite um evento a cada segundo e escutamos os eventos emitidos. Após 5 segundos cancelamos a escuta.
Broadcast
Broadcast é um tipo de stream que permite que vários listeners escutem os eventos emitidos.
Por padrão os streams são single-subscription, ou seja, apenas um listener pode escutar os eventos emitidos.
var stm = Stream
.periodic(Duration(seconds: 1), (event) => event)
stm.listen((event) {
print('listener 1: $event');
});
stm.listen((event) {
print('listener 2: $event');
});
Executando esse exemplo teremos os seguinte error: 👇
Unhandled exception:
Bad state: Stream has already been listened to.
Usando o método asBroadcastStream
podemos transformar um stream single-subscription em um stream broadcast,
dessa forma podemos escutar os eventos emitidos em vários listeners.
var stm = Stream
.periodic(Duration(seconds: 1), (event) => event)
.asBroadcastStream();
stm.listen((event) {
print('listener 1: $event');
});
stm.listen((event) {
print('listener 2: $event');
});
// output:
// listener 1: 0
// listener 2: 0
// listener 1: 1
// listener 2: 1
// listener 1: 2
// listener 2: 2 ...
StreamTransformer
StreamTransformer é um objecto que pode ser usado para transformar um stream.
Exemplo:
var stm = Stream
.periodic(Duration(seconds: 1), (event) => event)
.asBroadcastStream();
var transformer = StreamTransformer<int, String>.fromHandlers(
handleData: (data, sink) {
sink.add('value: $data');
},
);
stm.transform(transformer).listen((event) {
print(event);
});
// output: value: 0 value: 1 value: 2 value: 3 ...
Nesse exemplo, criamos um stream que emite um evento a cada segundo e transformamos os valores emitidos em strings.
Lendo arquivos com Streams
import 'dart:convert';
import 'dart:io';
import 'dart:math';
createFile() {
// criando arquivo
var file = File('random.txt');
// criando um objeto randomico
var random = Random.secure();
// criando um objeto que escreve no arquivo
var sink = file.openWrite();
// escrevendo no arquivo
for (var i = 0; i < 100; i++) {
var values = List<int>.generate(10, (i) => random.nextInt(255));
var hrash = base64UrlEncode(values);
sink.write(hrash + '\n');
}
// fechando o arquivo
sink.close();
}
readFile() {
// criando um objeto file
var file = File('random.txt');
// criando um objeto stream
// por padrão o método openRead() retorna um objeto Stream<List<int>>
var stream = file.openRead();
// criando um objeto que lê o stream
var lines = stream.transform(utf8.decoder).transform(LineSplitter());
var lineNumber = 0;
//print a line with index
lines.listen((line) {
print('${lineNumber++}: $line');
});
}
void main() {
createFile();
readFile();
}
Nesse exemplo, criamos um arquivo com 100 linhas de strings aleatórias e lemos o arquivo linha por linha.
o método transform()
recebe um objeto StreamTransformer
.
o método LineSplitter()
é um StreamTransformer
que transforma um stream de String em um stream de linhas.
o objeto utf8.decoder
é um StreamTransformer
que transforma um stream de bytes em um stream de String.
Executando o código do exemplo teremos a seguinte saída:
0: EeiYFJSvOycnwA==
1: tojGQMrPzmOI5A==
2: uJi0hk4TDlueMA==
3: iHLTOi-BBuIW3w==
4: hlweYxCyPw5r9A==
5: n0RvyFOOcJAnUg==
6: Er8PkAYeGZmUbw==
7: z9kweXDS-M6dIA==
8: O9L2aYB788bEeQ==
9: Ai4Ea-ZJ9MgLtA==
10: 0fHJXvzSvN-MhA==
...
Referências
Para entender melhor e de forma geral o conceito de streams visite os seguintes links: