Paralelná dekompozícia v kotlinovských korutinách

2024/01/07

Ukážme si delenie úlohy na paralelné výpočty s použitím kotlinovských korutín.

Počítajme slová v samostatných korutinách bežiacich paralelne a spočítavajme celkové výsledky.

Urobme si funkciu na počítanie slov v súbore:

fun File.wc(): Int = useLines { lines ->
    lines.flatMap {
        it.split(Regex("\\s+"))
    }.count()
}

Ak chceme počítať súbory v adresári, vytvorme si pomocnú funkciu:

fun File.children(): Array<File> {
    return listFiles { child: File -> child.isFile } ?: emptyArray()
}

Sekvenčné počítanie

Sekvenčné počítanie je jednoduché:

fun main() {
    val wordCounts = File("src/test/resources").children().map {
        it.wc()
    }
    println(wordCounts)
}

Čo takto však rýchlejšie paralelné výpočty?

Paralelné výpočty

Paralelné spustenie potrebuje najmä:

  • spustenie úlohy v sekcii async,

  • získanie objektu s úlohou Deferred,

  • vyčkanie na dobehnutie úloh pomocou a získanie výsledkov cez awaitAll,

  • spravovanie chýb a rozsah platnosti cez coroutineScope

coroutineScope { (3)
    val jobs = mutableListOf<Deferred<Int>>()
    File("src/test/resources").children().forEach {
        jobs += async(Dispatchers.IO) { (1)
            it.wc()
        }
    }
    val wordCounts = jobs.awaitAll() (2)
    println(wordCounts)
}
1 Výpočet pustíme v samostatnej korutine na pozadí. Keďže ide o vstupno-výstupnú operáciu, použijeme dispečera pre I/O.

Výsledkom je objekt Deferred reprezentujúci úlohu.

2 Vyčkáme na dobehnutie všetkých úloh v korutinách.
3 Ustanovíme záber (scope) korutín. Ak ktorákoľvek z úloh zlyhá s výnimkou, všetky ostatné korutiny pre výpočet počtov slov sa zrušia tiež. Zároveň zabezpečíme, že tento scope dobehne až keď dobehnú korutiny, ktoré sa v ňom spustia.
Objekt Deferred reprezentuje „budúci výsledok“, či „hodnotu, ktorá sa eventuálne vypočíta“. Ke filozoficky podobný CompletableFuture (Java), či Promise (JavaScript).

Ak to chceme vyskúšať v main-e, musíme použiť runBlocking:

fun main() = runBlocking {
    coroutineScope {
        val jobs = mutableListOf<Deferred<Int>>()
        File("src/test/resources").children().forEach {
            jobs += async(Dispatchers.IO) {
                it.wc()
            }
        }
        val wordCounts = jobs.awaitAll()
        println(wordCounts)
    }
}

Ak chceme byť viac funkcionálni a zbaviť sa premennej jobs:

coroutineScope {
    val wordCounts = File("src/test/resources")
        .children()
        .map {
            async(Dispatchers.IO) {
                it.wc()
            }
        }
        .awaitAll()
    println(wordCounts)
}
Filozofia async-awaitAll skôr pripomína filozofiu „fork-join“ či „map-reduce“, kde sa v bloku async spustia paralelné úlohy a v bloku awaitAll pozbierajú výsledky.

Sekcia coroutineScope dbá na to, aby sa v prípade chýb celá operácia rovno zrušila.

Kombinácia coroutineScope, async a awaitAll umožňuje štruktúrovanú konkurentnosť (structured concurrency):

  • Nové korutiny sa spúšťajú v rámci konkrétneho scope, ktorý určuje ich životnosť.

  • Dbá sa na to, aby beh korutiny „neunikol“ mimo životnosti rodiča.

  • Výnimky sa korektne spracujú tak, aby sa nenarušili životnosti korutín a rodičovského scopu.

Na našom konkrétnom príklade:

  • coroutineScope hovorí, že sa počká na dobehnutie korutín v bloku. Životnosť korutín pre počítanie slov v bloku coroutineScope { …​ } je teda vždy kratšia ako životnosť celého scope coroutineSCope.

  • samotný coroutineScope má takú životnosť ako rodič. Rodičom coroutineScope je scope v runBlocking a jeho životnosť je vždy kratšia ako životnosť celej aplikácie.

A výnimky?

  • Ak nastane výnimka v ktorejkoľvek korutine, nesmie sa stať, že pravidlo o „matrioškových životnostiach“ sa poruší. Rodič sa nesmie zrušiť, kým bežia deti a naopak: ak sa zruší rodič, musia sa zrušiť aj jeho deti. Okrem toho rodič vyčkáva na dobehnutie svojich detí!

Výnimky a korutiny

Predstavme si, že chceme počítať slová nad zoznamom súborov, kde jeden z nich nejestvuje:

coroutineScope {
    val dir = File("src/test/resources")
    val wordCounts = listOf(dir.resolve("bible.txt"), dir.resolve("bible999.txt"))
        .map {
            async(Dispatchers.IO) {
                it.wc()
            }
        }
        .awaitAll()
    println(wordCounts)
}

Obratom uvidíme výnimku:

Exception in thread "main" java.io.FileNotFoundException: src/test/resources/bible999.txt (No such file or directory)
	at java.base/java.io.FileInputStream.open0(Native Method)
	at java.base/java.io.FileInputStream.open(FileInputStream.java:216)
	at java.base/java.io.FileInputStream.<init>(FileInputStream.java:157)
	at com.github.novotnyr.coroutines.parallelDecomposition.ForkJoinKt.wc(ForkJoin.kt:67)
	at com.github.novotnyr.coroutines.parallelDecomposition.ForkJoinKt$main$1$1$wordCounts$1$1.invokeSuspend(ForkJoin.kt:25)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.internal.LimitedDispatcher$Worker.run(LimitedDispatcher.kt:115)
	at kotlinx.coroutines.scheduling.TaskImpl.run(Tasks.kt:103)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

V prípade výnimky v ktorejkoľvek z korutín spúšťaných v async bloku sa ostatné súrodenecké korutiny zrušia.

Nezabudnime, že coroutineScope plní dva účely:

  1. Pozastaví sa (suspend), kým korutiny, ktoré sú v ňom deklarované, nedobehnú.

  2. Ak tento scope zlyhá — či kvôli zrušeniu (cancel) alebo výnimke — všetky vnorené korutiny sa tiež zrušia.

Supervisor Scope

Supervízorský scope je podobný ako coroutineScope, ale rušenie potomkov prebieha len smerom „od rodiča k potomkom“, nikdy nie naopak.

  1. Ak ktorýkoľvek potomok zlyhá, scope sa nezruší.

  2. Pozastaví sa (suspend), kým korutiny, ktoré sú v ňom deklarované, nedobehnú.

  3. Ak scope zlyhá — či kvôli zrušeniu (cancel) alebo výnimke — všetky vnorené korutiny sa tiež zrušia.

Vyčkávanie na dobehnutie potomkov

Na rozdiel od coroutineScope nie je vhodné používať awaitAll, ktorý skončí v momente, keď ktorýkoľvek z potomkov zlyhá. To sme videli vo výnimke hore.

Namiesto toho budeme vyčkávať jednotlivo a ošetrovať prípadné výnimky, ktoré korutiny vyhodia.

Každý výsledok volania async typu Deferred vyčkáme pomocou samostatného volania await. Toto volanie buď uspeje a vráti výsledok — teda počet slov v súbore — alebo zlyhá s výnimkou, ktorú odchytíme a vhodne spracujeme.

val dir = File("src/test/resources")
supervisorScope {
    val files = listOf("bible.txt", "bible999.txt")
    val wordCounts = files.map { dir.resolve(it) }
        .map {
            async(Dispatchers.IO) {
                it.wc()
            }
        }.map { it: Deferred<Int> -> (1)
            it.runCatching { (3)
                await() (2)
            }.getOrNull() (3)
        }

    println(wordCounts)
}
1 Prúd objektov Deferred postupne spracujeme po jednom.
2 Na každý Deferred vyčkáme cez await.
3 Ak nastane výnimka, odchytíme ju a vrátime null.

Výsledkom bude zoznam, kde niektoré prvky budú obsahovať počet slov v súbore a pre nedostupné veľkosti kvôli výnimkám bude v zozname null.

[796494, null]

Bloky runBlocking a coroutineScope

Blok runBlocking je coroutine builder, teda nástroj na zostrojenie a spustenie korutiny. Jeho jediné použitie je pri premostení sveta bežného programovania a sveta, v ktorom je možné spúšťať suspend funkcie — typicky len v metóde main, historických knižniciach a testoch.

coroutineScope sa používa len vo svete suspend funkcií. (Samotná funkcia coroutineScope je označená ako suspend). Tento blok nevytvára a nespúšťa novú korutinu.

Oba bloky počkajú na dobehnutie potomkovských korutín, ale runBlocking pri tom zablokuje vlákno v ktorom beží, zatiaľ čo coroutineScope sa pozastaví (suspenduje) bez blokovania.

Oba bloky riešia obojsmerné rušenie: ak zlyhá potomok, zrušia sa aj všetky ostatné potomkovské korutiny a zároveň aj príslušný rodičovský scope.

Priama kombinácia runBlocking a coroutineScope nedáva dohromady zmysel, keďže coroutineScope vyčká na dobehnutie korutín v async a runBlocking tiež počká na dobehnutie tých istých korutín. To je však špeciálna situácia v hračkárskych a tutoriálových textoch.

Jednoduchý, ale výhradne tutoriálový príklad spustí rovno korutinu v dispečeri pre vstupno-výstupné operácie, počítacie korutiny sa spustia cez async v tom istom dispečeri a vyčkávanie na dobehnutie korutín (spolu s vyblokovaním hlavného vlákna) zabezpečí runBlocking.

fun main() = runBlocking(Dispatchers.IO) {
    val wordCounts = File("src/test/resources")
        .children()
        .map {
            async {
                it.wc()
            }
        }.awaitAll()

    println(wordCounts)
}
>> Home