Ukážme si delenie úlohy na paralelné výpočty s použitím kotlinovských korutín.
Počítajme slová v samostatných korutinách bežiacich paralelne a spočítavajme celkové výsledky.
Ukážme si delenie úlohy na paralelné výpočty s použitím kotlinovských korutín.
Počítajme slová v samostatných korutinách bežiacich paralelne a spočítavajme celkové výsledky.
Urobme si funkciu na počítanie slov v súbore:
fun File.wc(): Int = useLines { lines ->
lines.flatMap {
it.split(Regex("\\s+"))
}.count()
}
Ak chceme počítať súbory v adresári, vytvorme si pomocnú funkciu:
fun File.children(): Array<File> {
return listFiles { child: File -> child.isFile } ?: emptyArray()
}
Sekvenčné počítanie je jednoduché:
fun main() {
val wordCounts = File("src/test/resources").children().map {
it.wc()
}
println(wordCounts)
}
Čo takto však rýchlejšie paralelné výpočty?
Paralelné spustenie potrebuje najmä:
spustenie úlohy v sekcii async
,
získanie objektu s úlohou Deferred
,
vyčkanie na dobehnutie úloh pomocou a získanie výsledkov cez awaitAll
,
spravovanie chýb a rozsah platnosti cez coroutineScope
coroutineScope { (3)
val jobs = mutableListOf<Deferred<Int>>()
File("src/test/resources").children().forEach {
jobs += async(Dispatchers.IO) { (1)
it.wc()
}
}
val wordCounts = jobs.awaitAll() (2)
println(wordCounts)
}
1 | Výpočet pustíme v samostatnej korutine na pozadí.
Keďže ide o vstupno-výstupnú operáciu, použijeme dispečera pre I/O.
Výsledkom je objekt |
2 | Vyčkáme na dobehnutie všetkých úloh v korutinách. |
3 | Ustanovíme záber (scope) korutín. Ak ktorákoľvek z úloh zlyhá s výnimkou, všetky ostatné korutiny pre výpočet počtov slov sa zrušia tiež. Zároveň zabezpečíme, že tento scope dobehne až keď dobehnú korutiny, ktoré sa v ňom spustia. |
Objekt Deferred reprezentuje „budúci výsledok“, či „hodnotu, ktorá sa eventuálne vypočíta“. Ke filozoficky podobný CompletableFuture (Java), či Promise (JavaScript).
|
Ak to chceme vyskúšať v main
-e, musíme použiť runBlocking
:
fun main() = runBlocking {
coroutineScope {
val jobs = mutableListOf<Deferred<Int>>()
File("src/test/resources").children().forEach {
jobs += async(Dispatchers.IO) {
it.wc()
}
}
val wordCounts = jobs.awaitAll()
println(wordCounts)
}
}
Ak chceme byť viac funkcionálni a zbaviť sa premennej jobs
:
coroutineScope {
val wordCounts = File("src/test/resources")
.children()
.map {
async(Dispatchers.IO) {
it.wc()
}
}
.awaitAll()
println(wordCounts)
}
Filozofia async -awaitAll skôr pripomína filozofiu „fork-join“ či „map-reduce“, kde sa v bloku async spustia paralelné úlohy a v bloku awaitAll pozbierajú výsledky.
|
Sekcia coroutineScope
dbá na to, aby sa v prípade chýb celá operácia rovno zrušila.
Kombinácia coroutineScope
, async
a awaitAll
umožňuje štruktúrovanú konkurentnosť (structured concurrency):
Nové korutiny sa spúšťajú v rámci konkrétneho scope, ktorý určuje ich životnosť.
Dbá sa na to, aby beh korutiny „neunikol“ mimo životnosti rodiča.
Výnimky sa korektne spracujú tak, aby sa nenarušili životnosti korutín a rodičovského scopu.
Na našom konkrétnom príklade:
coroutineScope
hovorí, že sa počká na dobehnutie korutín v bloku.
Životnosť korutín pre počítanie slov v bloku coroutineScope { … }
je teda vždy kratšia ako životnosť celého scope coroutineSCope
.
samotný coroutineScope
má takú životnosť ako rodič.
Rodičom coroutineScope
je scope v runBlocking
a jeho životnosť je vždy kratšia ako životnosť celej aplikácie.
A výnimky?
Ak nastane výnimka v ktorejkoľvek korutine, nesmie sa stať, že pravidlo o „matrioškových životnostiach“ sa poruší. Rodič sa nesmie zrušiť, kým bežia deti a naopak: ak sa zruší rodič, musia sa zrušiť aj jeho deti. Okrem toho rodič vyčkáva na dobehnutie svojich detí!
Predstavme si, že chceme počítať slová nad zoznamom súborov, kde jeden z nich nejestvuje:
coroutineScope {
val dir = File("src/test/resources")
val wordCounts = listOf(dir.resolve("bible.txt"), dir.resolve("bible999.txt"))
.map {
async(Dispatchers.IO) {
it.wc()
}
}
.awaitAll()
println(wordCounts)
}
Obratom uvidíme výnimku:
Exception in thread "main" java.io.FileNotFoundException: src/test/resources/bible999.txt (No such file or directory) at java.base/java.io.FileInputStream.open0(Native Method) at java.base/java.io.FileInputStream.open(FileInputStream.java:216) at java.base/java.io.FileInputStream.<init>(FileInputStream.java:157) at com.github.novotnyr.coroutines.parallelDecomposition.ForkJoinKt.wc(ForkJoin.kt:67) at com.github.novotnyr.coroutines.parallelDecomposition.ForkJoinKt$main$1$1$wordCounts$1$1.invokeSuspend(ForkJoin.kt:25) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108) at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:115) at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:103) at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)
V prípade výnimky v ktorejkoľvek z korutín spúšťaných v async
bloku sa ostatné súrodenecké korutiny zrušia.
Nezabudnime, že
|
Supervízorský scope je podobný ako coroutineScope
, ale rušenie potomkov prebieha len smerom „od rodiča k potomkom“, nikdy nie naopak.
Ak ktorýkoľvek potomok zlyhá, scope sa nezruší.
Pozastaví sa (suspend), kým korutiny, ktoré sú v ňom deklarované, nedobehnú.
Ak scope zlyhá — či kvôli zrušeniu (cancel) alebo výnimke — všetky vnorené korutiny sa tiež zrušia.
Na rozdiel od coroutineScope
nie je vhodné používať awaitAll
, ktorý skončí v momente, keď ktorýkoľvek z potomkov zlyhá.
To sme videli vo výnimke hore.
Namiesto toho budeme vyčkávať jednotlivo a ošetrovať prípadné výnimky, ktoré korutiny vyhodia.
Každý výsledok volania async
typu Deferred
vyčkáme pomocou samostatného volania await
.
Toto volanie buď uspeje a vráti výsledok — teda počet slov v súbore — alebo zlyhá s výnimkou, ktorú odchytíme a vhodne spracujeme.
val dir = File("src/test/resources")
supervisorScope {
val files = listOf("bible.txt", "bible999.txt")
val wordCounts = files.map { dir.resolve(it) }
.map {
async(Dispatchers.IO) {
it.wc()
}
}.map { it: Deferred<Int> -> (1)
it.runCatching { (3)
await() (2)
}.getOrNull() (3)
}
println(wordCounts)
}
1 | Prúd objektov Deferred postupne spracujeme po jednom. |
2 | Na každý Deferred vyčkáme cez await . |
3 | Ak nastane výnimka, odchytíme ju a vrátime null . |
Výsledkom bude zoznam, kde niektoré prvky budú obsahovať počet slov v súbore a pre nedostupné veľkosti kvôli výnimkám bude v zozname null
.
[796494, null]
runBlocking
a coroutineScope
Blok runBlocking
je coroutine builder, teda nástroj na zostrojenie a spustenie korutiny.
Jeho jediné použitie je pri premostení sveta bežného programovania a sveta, v ktorom je možné spúšťať suspend
funkcie — typicky len v metóde main
, historických knižniciach a testoch.
coroutineScope
sa používa len vo svete suspend
funkcií. (Samotná funkcia coroutineScope
je označená ako suspend
). Tento blok nevytvára a nespúšťa novú korutinu.
Oba bloky počkajú na dobehnutie potomkovských korutín, ale runBlocking
pri tom zablokuje vlákno v ktorom beží, zatiaľ čo coroutineScope
sa pozastaví (suspenduje) bez blokovania.
Oba bloky riešia obojsmerné rušenie: ak zlyhá potomok, zrušia sa aj všetky ostatné potomkovské korutiny a zároveň aj príslušný rodičovský scope.
Priama kombinácia runBlocking a coroutineScope nedáva dohromady zmysel, keďže coroutineScope vyčká na dobehnutie korutín v async a runBlocking tiež počká na dobehnutie tých istých korutín.
To je však špeciálna situácia v hračkárskych a tutoriálových textoch.
|
Jednoduchý, ale výhradne tutoriálový príklad spustí rovno korutinu v dispečeri pre vstupno-výstupné operácie, počítacie korutiny sa spustia cez async
v tom istom dispečeri a vyčkávanie na dobehnutie korutín (spolu s vyblokovaním hlavného vlákna) zabezpečí runBlocking
.
fun main() = runBlocking(Dispatchers.IO) {
val wordCounts = File("src/test/resources")
.children()
.map {
async {
it.wc()
}
}.awaitAll()
println(wordCounts)
}