Nous terminons ce chapitre en combinant ce que vous avez appris dans le Chapitre 10, “Streams”, avec les concepts que vous avez appris dans ce chapitre. L’une des fonctionnalités les plus puissantes de l’API Stream est le support intégré de la concurrence. Jusqu’à présent, tous les streams avec lesquels vous avez travaillé étaient des streams séquentiels. Un stream séquentiel est un stream dans lequel les résultats sont ordonnés, avec un seul élément traité à la fois.
Un stream parallèle est capable de traiter les résultats simultanément, en utilisant plusieurs threads. Par exemple, vous pouvez utiliser un stream parallèle et l’opération map()
pour opérer simultanément sur les éléments du stream, améliorant considérablement les performances par rapport au traitement d’un seul élément à la fois.
L’utilisation d’un stream parallèle peut modifier non seulement les performances de votre application, mais aussi les résultats attendus. Comme vous le verrez, certaines opérations nécessitent également une gestion spéciale pour pouvoir être traitées de manière parallèle.
Le nombre de threads disponibles dans un stream parallèle est proportionnel au nombre de CPU disponibles dans votre environnement.
Création de Streams Parallèles
L’API Stream a été conçue pour rendre la création de streams parallèles assez simple. Vous devriez être familier avec deux façons de créer un stream parallèle.
Collection<Integer> collection = List.of(1,2);
Stream<Integer> p1 = collection.stream().parallel();
Stream<Integer> p2 = collection.parallelStream();
La première façon de créer un stream parallèle est à partir d’un stream existant. N’est-ce pas génial ? Un stream peut être rendu parallèle ! La deuxième façon de créer un stream parallèle est à partir d’une classe Collection Java. Nous utilisons ces deux méthodes tout au long de cette section.
L’interface Stream inclut une méthode isParallel()
qui peut être utilisée pour tester si l’instance d’un stream prend en charge le traitement parallèle. Certaines opérations sur les streams préservent l’attribut parallèle, tandis que d’autres non.
Réalisation d’une Décomposition Parallèle
Une décomposition parallèle est le processus qui consiste à prendre une tâche, à la décomposer en morceaux plus petits qui peuvent être exécutés simultanément, puis à réassembler les résultats. Plus une décomposition est concurrente, plus l’amélioration des performances liée à l’utilisation des streams parallèles est importante.
Essayons. Tout d’abord, définissons une fonction réutilisable qui “fait du travail” simplement en attendant cinq secondes.
private static int faireTravail(int entree) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {}
return entree;
}
Nous pouvons prétendre que dans une application réelle, ce travail pourrait impliquer l’appel à une base de données ou la lecture d’un fichier. Maintenant, utilisons cette méthode avec un stream séquentiel.
long debut = System.currentTimeMillis();
List.of(1,2,3,4,5)
.stream()
.map(w -> faireTravail(w))
.forEach(s -> System.out.print(s + " "));
System.out.println();
var tempsPris = (System.currentTimeMillis()-debut)/1000;
System.out.println("Temps: "+tempsPris+" secondes");
Que pensez-vous que ce code va produire lorsqu’il est exécuté dans le cadre d’une méthode main()
? Regardons :
1 2 3 4 5
Temps: 25 secondes
Comme vous pouviez vous y attendre, les résultats sont ordonnés et prévisibles car nous utilisons un stream séquentiel. Cela a également pris environ 25 secondes pour traiter les cinq résultats, un à la fois. Que se passe-t-il si nous remplaçons la ligne du stream par une qui utilise parallelStream()
? Voici un exemple de sortie :
3 2 1 5 4
Temps: 5 secondes
Comme vous pouvez le voir, les résultats ne sont plus ordonnés ou prévisibles. Les opérations map()
et forEach()
sur un stream parallèle sont équivalentes à la soumission de plusieurs expressions lambda Runnable
à un exécuteur de threads en pool, puis à l’attente des résultats.
Qu’en est-il du temps requis ? Dans ce cas, notre système disposait de suffisamment de CPU pour que toutes les tâches soient exécutées simultanément. Si vous exécutiez ce même code sur un ordinateur avec moins de processeurs, il pourrait afficher 10 secondes, 15 secondes ou une autre valeur. L’essentiel est que nous avons écrit notre code pour tirer parti du traitement parallèle lorsqu’il est disponible, donc notre travail est terminé.
Ordonnancement des Résultats
Si votre opération de stream a besoin de garantir l’ordre et que vous n’êtes pas sûr si elle est séquentielle ou parallèle, vous pouvez remplacer la ligne 14 par une qui utilise forEachOrdered()
:
.forEachOrdered(s -> System.out.print(s + " "));
Cela affiche les résultats dans l’ordre dans lequel ils sont définis dans le stream :
1 2 3 4 5
Temps: 5 secondes
Bien que nous ayons perdu une partie des gains de performance liés à l’utilisation d’un stream parallèle, notre opération map()
peut toujours tirer parti du stream parallèle.
Traitement des Réductions Parallèles
Outre l’amélioration potentielle des performances et la modification de l’ordre des opérations, l’utilisation de streams parallèles peut avoir un impact sur la façon dont vous écrivez votre application. Une réduction parallèle est une opération de réduction appliquée à un stream parallèle. Les résultats des réductions parallèles peuvent différer de ce que vous attendez lorsque vous travaillez avec des streams séquentiels.
Exécution de Tâches Basées sur l’Ordre
Comme l’ordre n’est pas garanti avec les streams parallèles, des méthodes comme findAny()
sur les streams parallèles peuvent entraîner un comportement inattendu. Considérez l’exemple suivant :
System.out.print(List.of(1,2,3,4,5,6)
.parallelStream()
.findAny()
.get());
La JVM alloue un certain nombre de threads et renvoie la valeur du premier à renvoyer un résultat, qui pourrait être 4, 2, etc. Bien que ni le stream séquentiel ni le stream parallèle ne garantissent de renvoyer la première valeur, le stream séquentiel le fait souvent. Avec un stream parallèle, les résultats sont susceptibles d’être plus aléatoires.
Qu’en est-il des opérations qui tiennent compte de l’ordre, comme findFirst()
, limit()
et skip()
? L’ordre est toujours préservé, mais les performances peuvent être affectées sur un stream parallèle en raison d’une tâche de traitement parallèle forcée de coordonner tous ses threads d’une manière similaire à la synchronisation.
Du côté positif, les résultats des opérations ordonnées sur un stream parallèle seront cohérents avec un stream séquentiel. Par exemple, l’appel de skip(5).limit(2).findFirst()
renverra le même résultat sur des streams séquentiels et parallèles ordonnés.
Scénario du Monde Réel
Création de Streams Non Ordonnés
Tous les streams avec lesquels vous avez travaillé sont considérés comme ordonnés par défaut. Il est possible de créer un stream non ordonné à partir d’un stream ordonné, similaire à la façon dont vous créez un stream parallèle à partir d’un stream séquentiel.
List.of(1,2,3,4,5,6).stream().unordered();
Cette méthode ne réordonne pas les éléments ; elle indique simplement à la JVM que si une opération de stream basée sur l’ordre est appliquée, l’ordre peut être ignoré. Par exemple, l’appel de skip(5)
sur un stream non ordonné ignorera n’importe quels 5 éléments, pas nécessairement les 5 premiers requis sur un stream ordonné.
Pour les streams séquentiels, l’utilisation d’une version non ordonnée n’a aucun effet. Mais sur les streams parallèles, les résultats peuvent considérablement améliorer les performances.
List.of(1,2,3,4,5,6).stream().unordered().parallel();
Si vous développez des applications avec des streams parallèles, vous devriez savoir quand appliquer un stream non ordonné pour améliorer les performances.
Combinaison des Résultats avec reduce()
Comme vous l’avez appris dans le Chapitre 10, l’opération de stream reduce()
combine un stream en un seul objet. Rappelons que le premier paramètre de la méthode reduce()
est appelé l’identité, le deuxième paramètre est appelé l’accumulateur, et le troisième paramètre est appelé le combinateur. Voici la signature de la méthode :
<U> U reduce(U identite,
BiFunction<U,? super T,U> accumulateur,
BinaryOperator<U> combinateur)
Nous pouvons concaténer une liste de valeurs char
en utilisant la méthode reduce()
, comme le montre l’exemple suivant :
System.out.println(List.of('l', 'o', 'u', 'p')
.parallelStream()
.reduce("",
(s1,c) -> s1 + c,
(s2,s3) -> s2 + s3)); // loup
La dénomination des variables dans cet exemple de stream n’est pas accidentelle. Nous avons utilisé c pour caractère, tandis que s1, s2 et s3 sont des valeurs String.
Sur les streams parallèles, la méthode reduce()
fonctionne en appliquant la réduction à des paires d’éléments dans le stream pour créer des valeurs intermédiaires, puis en combinant ces valeurs intermédiaires pour produire un résultat final. En d’autres termes, dans un stream séquentiel, “loup” est construit un caractère à la fois. Dans un stream parallèle, les valeurs intermédiaires “lo” et “up” sont créées puis combinées.
Avec les streams parallèles, nous devons maintenant nous préoccuper de l’ordre. Que se passe-t-il si les éléments d’une chaîne sont combinés dans le mauvais ordre pour produire “lpou” ou “ulpo” ? L’API Stream évite ce problème tout en permettant aux streams d’être traités en parallèle, tant que vous suivez une règle simple : assurez-vous que l’accumulateur et le combinateur produisent le même résultat indépendamment de l’ordre dans lequel ils sont appelés.
Bien que ce ne soit pas dans le cadre du cours, l’accumulateur et le combinateur doivent être associatifs, non interférents et sans état.
Alors que les exigences pour les arguments d’entrée de la méthode reduce()
sont vraies pour les streams séquentiels et parallèles, vous n’avez peut-être pas remarqué de problèmes dans les streams séquentiels car le résultat était toujours ordonné. Avec les streams parallèles, cependant, l’ordre n’est plus garanti, et tout argument qui viole ces règles est beaucoup plus susceptible de produire des effets secondaires ou des résultats imprévisibles.
Examinons un exemple utilisant un accumulateur problématique. En particulier, l’ordre est important lors de la soustraction de nombres ; par conséquent, le code suivant peut produire différentes valeurs selon que vous utilisez un stream séquentiel ou parallèle. Nous pouvons omettre un paramètre combinateur dans ces exemples, car l’accumulateur peut être utilisé lorsque les types de données intermédiaires sont les mêmes.
System.out.println(List.of(1,2,3,4,5,6)
.parallelStream()
.reduce(0, (a, b) -> (a - b))); // ACCUMULATEUR PROBLÉMATIQUE
Il peut donner -21, 3 ou une autre valeur.
Vous pouvez voir d’autres problèmes si nous utilisons un paramètre identité qui n’est pas vraiment une valeur d’identité. Par exemple, à quoi vous attendez-vous à ce que le code suivant produise ?
System.out.println(List.of("l","o","u","p")
.parallelStream()
.reduce("X", String::concat)); // XlXoXuXp
Sur un stream séquentiel, il imprime Xloup, mais sur un stream parallèle, le résultat est XlXoXuXp. Dans le cadre du processus parallèle, l’identité est appliquée à plusieurs éléments du stream, ce qui donne des données très inattendues.
Sélection d’une Méthode reduce()
Bien que les versions à un et deux arguments de reduce()
prennent en charge le traitement parallèle, il est recommandé d’utiliser la version à trois arguments de reduce()
lorsque vous travaillez avec des streams parallèles. Fournir une méthode combinateur explicite permet à la JVM de partitionner les opérations dans le stream plus efficacement.
Combinaison des Résultats avec collect()
Comme reduce()
, l’API Stream inclut une version à trois arguments de collect()
qui prend des opérateurs accumulateur et combinateur ainsi qu’un opérateur fournisseur au lieu d’une identité.
<R> R collect(Supplier<R> fournisseur,
BiConsumer<R, ? super T> accumulateur,
BiConsumer<R, R> combinateur)
Aussi, comme reduce()
, les opérations accumulateur et combinateur doivent être capables de traiter les résultats dans n’importe quel ordre. De cette manière, la version à trois arguments de collect()
peut être exécutée comme une réduction parallèle, comme le montre l’exemple suivant :
Stream<String> stream = Stream.of("l", "o", "u", "p").parallel();
SortedSet<String> set = stream.collect(ConcurrentSkipListSet::new,
Set::add,
Set::addAll);
System.out.println(set); // [l, o, p, u]
Rappelons que les éléments dans un ConcurrentSkipListSet
sont triés selon leur ordre naturel. Vous devriez utiliser une collection concurrente pour combiner les résultats, en vous assurant que les résultats des threads concurrents ne provoquent pas une ConcurrentModificationException
.
L’exécution de réductions parallèles avec un collecteur nécessite des considérations supplémentaires. Par exemple, si la collection dans laquelle vous insérez est un ensemble de données ordonnées, comme une List
, les éléments de la collection résultante doivent être dans le même ordre, que vous utilisiez un stream séquentiel ou parallèle. Cela peut réduire les performances, car certaines opérations ne peuvent pas être complétées en parallèle.
Exécution d’une Réduction Parallèle sur un Collecteur
Bien que nous ayons couvert l’interface Collector
dans le Chapitre 10, nous n’avons pas entré dans les détails de ses propriétés. Chaque instance de Collector
définit une méthode characteristics()
qui renvoie un ensemble d’attributs Collector.Characteristics
. Lors de l’utilisation d’un Collector
pour effectuer une réduction parallèle, un certain nombre de propriétés doivent être vraies. Sinon, l’opération collect()
s’exécutera de manière monothread.
Exigences pour la Réduction Parallèle avec collect()
- Le stream est parallèle.
- Le paramètre de l’opération
collect()
a la caractéristiqueCharacteristics.CONCURRENT
. - Soit le stream est non ordonné, soit le collecteur a la caractéristique
Characteristics.UNORDERED
.
Par exemple, bien que Collectors.toSet()
ait la caractéristique UNORDERED
, il n’a pas la caractéristique CONCURRENT
. Par conséquent, ce qui suit n’est pas une réduction parallèle même avec un stream parallèle :
parallelStream.collect(Collectors.toSet()); // Pas une réduction parallèle
La classe Collectors
inclut deux ensembles de méthodes statiques pour récupérer des collecteurs, toConcurrentMap()
et groupingByConcurrent()
, qui sont tous deux UNORDERED
et CONCURRENT
. Ces méthodes produisent des instances de Collector
capables d’effectuer des réductions parallèles efficacement. Comme leurs homologues non concurrents, il existe des versions surchargées qui prennent des arguments supplémentaires.
Voici une réécriture d’un exemple du Chapitre 10 pour utiliser un stream parallèle et une réduction parallèle :
Stream<String> mesAnimaux = Stream.of("lions", "tigres", "ours").parallel();
ConcurrentMap<Integer, String> map = mesAnimaux
.collect(Collectors.toConcurrentMap(String::length,
k -> k,
(s1, s2) -> s1 + "," + s2));
System.out.println(map); // {5=lions,ours, 6=tigres}
System.out.println(map.getClass()); // java.util.concurrent.ConcurrentHashMap
Nous utilisons une référence ConcurrentMap
, bien que la classe réelle renvoyée soit probablement ConcurrentHashMap
. La classe particulière n’est pas garantie ; ce sera juste une classe qui implémente l’interface ConcurrentMap
.
Enfin, nous pouvons réécrire notre exemple groupingBy()
du Chapitre 10 pour utiliser un stream parallèle et une réduction parallèle.
var mesAnimaux = Stream.of("lions", "tigres", "ours").parallel();
ConcurrentMap<Integer, List<String>> map = mesAnimaux.collect(
Collectors.groupingByConcurrent(String::length));
System.out.println(map); // {5=[lions, ours], 6=[tigres]}
Comme précédemment, l’objet renvoyé peut être assigné à une référence ConcurrentMap
.
Scénario du Monde Réel
Éviter les Streams à État
Des effets secondaires peuvent apparaître dans les streams parallèles si vos expressions lambda sont à état. Une expression lambda à état est une expression dont le résultat dépend d’un état qui pourrait changer pendant l’exécution d’un pipeline. Par exemple, la méthode suivante qui filtre les nombres pairs est à état :
public List<Integer> ajouterValeurs(IntStream source) {
var donnees = Collections.synchronizedList(new ArrayList<Integer>());
source.filter(s -> s % 2 == 0)
.forEach(i -> { donnees.add(i); }); // À ÉTAT: NE FAITES PAS CELA!
return donnees;
}
Disons que cette méthode est exécutée avec un stream séquentiel :
var liste = ajouterValeurs(IntStream.range(1, 11));
System.out.print(liste); // [2, 4, 6, 8, 10]
Génial, les résultats sont dans le même ordre que celui dans lequel ils ont été entrés. Mais que se passe-t-il si quelqu’un passe un stream parallèle ?
var liste = ajouterValeurs(IntStream.range(1, 11).parallel());
System.out.print(liste); // [6, 8, 10, 2, 4]
Oh non : nos résultats ne correspondent plus à notre ordre d’entrée ! Le problème est que notre expression lambda est à état et modifie une liste qui est en dehors de notre stream. Nous pouvons corriger cette solution en réécrivant notre opération de stream pour qu’elle soit sans état :
public List<Integer> ajouterValeursOptimal(IntStream source) {
return source.filter(s -> s % 2 == 0)
.boxed()
.collect(Collectors.toList());
}
Cette méthode traite le stream puis collecte tous les résultats dans une nouvelle liste. Elle produit le même résultat ordonné sur les streams séquentiels et parallèles. Il est fortement recommandé d’éviter les opérations à état lors de l’utilisation de streams parallèles, afin d’éliminer tout effet secondaire potentiel sur les données. En fait, elles devraient être évitées dans les streams séquentiels puisque cela limite la capacité du code à tirer un jour parti de la parallélisation.