Deľba práce v korutinách cez kanáy)

2024/01/28

Ukážme si delenie roboty medzi korutiny pomocou spoločného kanála.

Rátajme veľkosti podadresárov v adresári paralelne!

Rozkaz znel jasne: zrátať veľkosť adresára vrátane podadresárov a to čo najrýchlejšie.

Na to potrebujeme:

  1. Zrátať rekurzívne veľkosť jedného adresára.

  2. Rozumne rozdeliť robotu medzi viacerých pracovníkov.

Veľkosť jedného adresára

Veľkosť jedného adresára zrátame napríklad takto:

fun Path.totalSize(): Long = Files.newDirectoryStream(this).use { dirs -> (1)(2)(3)
    dirs.sumOf { child: Path -> (4)
        when {
            child.isRegularFile() -> child.fileSize() (6)
            child.isDirectory() -> child.totalSize() (5)
            else -> 0
        }
    }
}
1 Založíme extension function na triede java.nio.file.Path.
2 Získame prúd súborov a adresárov typu DirectoryStream.
3 Keďže tento prúd musíme korektne uzatvoriť po skončení práce, využijeme kotlinovský blok use, čo je ekvivalent javáckeho try-with-resources.
4 Dohromady zosumarizujeme veľkosti podadresárov.
5 Ak je to adresár, jeho veľkosť zistíme rekurzívne.
6 Ak je to bežný súbor, jeho veľkosť zistíme napiamo.

Delenie práce

Spustíme dve korutiny:

  1. Korutina, ktorá zozbiera podadresáre v danom adresári.

  2. Korutina, ktorá pre daný adresár zistí jeho celkovú veľkosť.

Obe korutiny budú komunikovať prostredníctvom kanála (channel), ktorým budú putovať adresáre súce na spočítanie celkovej veľkosti. Dátový typu adresárov bude Path.

Korutina na zber podadresárov v adresári

Blok produce predstavuje coroutine builder, teda blok, ktorý nastaví a spustí novú korutinu.

Okrem toho, že sa spustí nová korutina, sa pripraví aj nový kanál, do ktorého sa budú posielať elementy určené na konzumovanie z inej korutiny.

fun CoroutineScope.directories(path: Path) = produce { (1)
    Files.newDirectoryStream(path, Path::isDirectory).forEach {
        logger.info("Submitting $it to a channel")
        channel.send(it) (2)
    }
    channel.close() (3)
}
1 Spustíme novú korutinu. Korutina vyžaduje scope — teda rozsah platnosti, v ktorom pobeží. Keďže funkcia spúšťa novú korutinu, podľa konvencie je CoroutineScope uvedený ako jej prijímač (receiver).
2 Každý adresár v danom adresári pošleme do kanála ako kandidáta na výpočet celkovej veľkosti.
3 Na konci kanál zavrieme — je to akýsi ekvivalent EOF („end-of-file“), teda koncu dát.

Distribúcia práce

Distribúcia práce bude nasledovná:

  1. získame kanál, do ktorého budeme posielať adresáre súce na výpočet

  2. spustíme niekoľko korutín, ktoré budú zo spoločného kanála čítať adresáre

  3. každý adresár bude spracovaný a zistí sa jeho veľkosť.

Ukážme si kód:

fun main(): Unit = runBlocking {
    val rootDir = Path.of(".m2") (1)
    val channel: ReceiveChannel<Path> = directories(rootDir) (2)
    repeat(5) { (3)
        launch(Dispatchers.Default) { (4)
            for (path in channel) { (5)
                val size = path.totalSize() (6)
                logger.info("$path: $size")
            }
        }
    }
}
1 Nastavme iniciálny adresár.
2 Funkcia directories vyžaduje coroutine scope (záber platnosti), ktorý prevezmeme od rodiča. Rodičom je v tomto prípade záber runBlocking. Výsledkom volania je kanál určený výhradne na čítanie, z ktorého vypadávajú cesty Path.
3 Spustíme päť korutín na čítanie zo spoločného kanála.
4 Každú korutinu spustíme nanovo v dispečeri Default.
5 Zo spoločného kanála čítame pomocou cyklu for.
6 Získame cestu, pre ktorú spočítame celkovú veľkosť.

Vzorový výpis

Ukážme si vzorový výpis:

00:27:27.119 [main @coroutine#2] INFO df -- Submitting .m2/repository to a channel
00:27:27.122 [main @coroutine#2] INFO df -- Submitting .m2/wrapper to a channel
00:27:27.123 [main @coroutine#2] INFO df -- Submitting .m2/.lemminx-maven to a channel
00:27:27.123 [main @coroutine#2] INFO df -- Submitting .m2/mvnd to a channel
00:27:27.125 [DefaultDispatcher-worker-2 @coroutine#7] INFO df -- .m2/mvnd: 2656560
00:27:27.125 [DefaultDispatcher-worker-4 @coroutine#6] INFO df -- .m2/.lemminx-maven: 0
00:27:27.133 [DefaultDispatcher-worker-3 @coroutine#5] INFO df -- .m2/wrapper: 76772816
00:27:28.799 [DefaultDispatcher-worker-1 @coroutine#4] INFO df -- .m2/repository: 15613915434

Načítavanie adresárov sa deje v korutine bežiacej v hlavnom vlákne main. Všetky adresáre sa zapíšu do spoločného kanála.

Následne vidíme, ako každá z korutín beží v samostatnom vlákne (DefaultDispatcher-worker) a nezávisle ráta veľkosti jednotlivých adresárov.

Kanál typu fan out

Toto je ukážka delenia práce — kanál slúži ako centrálne miesto, v ktorom sa kopí robota a jednotlivé korutiny si z neho vyberajú úlohy a dokola spracovávajú.

Každá korutina dostane jednu úlohu presne raz!

Podrobnosti možno nájsť v dokumentácii.

>> Home