package main
func streamWork(animalChan chan<- Animal) {
for _, animal := range animals {
animalChan <- animal (2)
}
close(animalChan) (3)
}
func main() {
animalChan := make(chan Animal) (1)
streamWork(animalChan) (4)
}
type Animal struct {
species string
weight int
}
var animals = []Animal{
Animal{"slon", 12},
Animal{"hroch", 4},
Animal{"nosorožec", 4},
Animal{"žirafa", 2},
Animal{"bizón", 2},
Animal{"veľryba", 190},
}
Minule sme vážili zvieratá na viacerých gorutinách. Ukážme si situáciu, keď máme worker pool, teda bank konkrétneho počtu gorutín, ktoré si vyberajú robotu zo spoločného frontu, spracovávajú a výsledky posielajú do nového spoločného frontu.
Začnime opäť vážiť zvieratá.
Kostra
V kostre si len vytvorme kanál pre zvieratá, do ktorého budeme posielať jedince súce na váženie.
Rovno povedzme, že toto skončí deadlockom.
1 | Nebufferovaný kanál, z ktorého si budú neskôr pracujúce gorutiny vyberať úlohy. |
2 | Pošleme doňho zvieratá. |
3 | Na konci kanál zavrieme. |
4 | Zavoláme funkciu, ktorá pošle údaje so zvieratami do komunikačného nebufferovaného kanála. |
Toto je samozrejme deadlock: z kanála nik nečíta a program obratom skončí.
Distribúcia práce
Distribuujme teraz prácu cez „manažéra“, ktorý bude čítať z kanála úloh a rozdeľovať prácu.
Na začiatku ju rozdelí sám sebe.
func distributeWork(animalChan <-chan Animal) { (1)
for animal := range animalChan {
log.Printf("%v\n", animal)
}
}
1 | Čítame z kanála, kým sa kanál neuzavrie a zvieratá len vypisujeme. |
Použitie v kóde? Dôležité je predísť deadlocku.
Keďže máme synchrónny nebufferovaný kanál, nemôžeme najprv posielať zvieratá na váženie a potom čakať na úlohy. To by bol jasný deadlock — producent by nemal konzumenta.
Nemôžeme to ani urobiť naopak, pretože by manažér-konzument čakal na producenta, ktorý by sa spustil až neskôr.
Rozseknime go-rdický uzol a spustime jednu z týchto súčiastok v gorutine — a zvoľme si za ňu funkciu, ktorá zaradí údaje do kanála.
func main() {
animalChan := make(chan Animal)
go streamWork(animalChan) (1)
distributeWork(animalChan) (2)
}
1 | V gorutine pustíme „prúd“ zvierat, ktoré sa ocitnú v komunikačnom kanáli. |
2 | Začneme distribuovať prácu. |
Distribúcia práce vo funkcii Distribúcia práce
Deadlock však nenastáva — jednak pre komunikáciu medzi dvoma korutinami a zároveň kvôli jasnému explicitnému uzatvoreniu kanála. |
Robotníci pre tri gorutiny vo Worker Pool.
Začnime naozaj rozdeľovať prácu, a nielen simulovať!
Vytvorme si samostatnú funkciu pre robotnícku triedu, pardón, pre robotnícku funkciu.
Tá bude veľmi pomaly vážiť veľmi veľké zvieratá.
func worker(workerId int, animalChan <-chan Animal) { (1)
for a := range animalChan { (2)
time.Sleep(1 * time.Second)
log.Printf("[%d] %d\t%s\n", workerId, a.weight, a.species) (3)
}
}
1 | Funkcia zoberie identifikátor (pre ladiace účely) a kanál, z ktorého bude načítavať zvieratá. |
2 | Zvieratá berieme z komunikačného kanála, kým sa kanál nezavrie. |
3 | Vážime a vypisujeme. |
Kto zatvára kanál? Producent, teda funkcia streamWork . To je dôležitá obrana proti deadlocku.
|
Upravme aj manažment:
func distributeWork(animalChan <-chan Animal) {
const workerCount = 3
for i := 0; i < workerCount; i++ {
go worker(i, animalChan) (1)
}
}
1 | Spusťme tri — nie viac, nie menej — gorutiny. Pošlime im vstupný kanál. |
Gorutiny budú súperiť o zvieratá v komunikačnom kanáli. |
Spusťme firmu na zvieratá
Ak teraz spustíme main
, neuvidíme nič.
Povaha distributeWork
sa zmenila — už len vystrelí salvu gorutín, tak ako v predošlých životných situáciách, a okamžite skončí.
Keď to chceme urobiť hnusne, budeme čakať.
func main() {
animalChan := make(chan Animal)
go streamWork(animalChan)
distributeWork(animalChan)
time.Sleep(3 * time.Second) (1)
}
1 | Hnusne čakáme. |
Výstup pošle dve salvy za sekundu. Sekundu totiž trvá, kým každý worker
spracuje výsledok.
2023/01/14 20:08:09 [2] 4 nosorožec
2023/01/14 20:08:09 [0] 12 slon
2023/01/14 20:08:09 [1] 4 hroch
2023/01/14 20:08:10 [2] 2 žirafa
2023/01/14 20:08:10 [1] 190 veľryba
2023/01/14 20:08:10 [0] 2 bizón
Čakanie je však škaredé.
Ak chceme čakať na dobehnutie manažéra, znamená to čakať na koniec gorutín.
Je teda čas na WaitGroup
.
func distributeWork(animalChan <-chan Animal) {
const workerCount = 3
var wg sync.WaitGroup
wg.Add(workerCount) (1)
for i := 0; i < workerCount; i++ {
go func(workerId int) { (2)
worker(workerId, animalChan)
wg.Done() (3)
}(i) (2)
}
wg.Wait() (4)
}
1 | Pripravíme si WaitGroup s takým počtom gorutín, koľko máme robotníkov. |
2 | Spustíme v gorutine robotníka.
Použijeme vnorenú funkciu, do ktorej dopravíme identifikátor robotníka cez parameter, pretože využívame premennú i z iterácie. |
3 | Keď robotník skončí, znížime počítadlo vo WaitGroup . |
4 | Čakáme, kým dobehnú všetky gorutiny. |
Takto už nemusíme škaredo čakať:
func main() {
animalChan := make(chan Animal)
go streamWork(animalChan)
distributeWork(animalChan)
}
Program skončí zhruba po 2 sekundách, keď dobehnú obe salvy troch gorutín.
Váženie s výsledkami
Ak chceme, aby gorutiny komunikovali späť, použijeme recepty z predošlých dielov.
-
Funkcia
worker
bude posielať výsledky do samostatného kanála. -
Funkcia
distributeWork
bude čakať na výsledky. -
A vytvoríme samostatnú funkciu na agregáciu výsledných váh.
Robotníčky odpovedajú do kanála
func worker(workerId int,
animalChan <-chan Animal,
weightChan chan<- int) { (1)
for a := range animalChan {
time.Sleep(1 * time.Second)
log.Printf("[%d] %d\t%s\n", workerId, a.weight, a.species)
weightChan <- a.weight (2)
}
}
1 | Očakávame tretí parameter: kanál pre navážené hmotnosti. |
2 | Každú hmotnosť zapíšeme do kanála. |
Agregácia výsledkov
Agregácia výsledkov načíta údaje a zlúči:
func aggregateResults(weights <-chan int) int {
total := 0
for weight := range weights {
total += weight
}
return total
}
Distribúcia roboty prekopaná
Musíme prekopať distribúciu práce:
-
Zavedieme výstupný kanál.
-
Počkáme na výsledky.
-
Agregujeme.
func distributeWork(animalChan <-chan Animal) int {
const workerCount = 3
weights := make(chan int) (1)
var wg sync.WaitGroup
wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go func(workerId int) {
worker(workerId, animalChan, weights) (2)
wg.Done()
}(i)
}
go func() { (3)
wg.Wait()
close(weights)
}()
return aggregateResults(weights) (4)
}
1 | Vytvoríme kanál pre výstupné výsledky. |
2 | Robotnícka gorutina dostane výstupný kanál ako argument. |
3 | Čakáme na výsledky v samostatnej gorutine. Toto je presne vzor z predošlého dielu. |
4 | Ak dôjdu všetky dáta, agregujeme a výsledok vrátime z funkcie. |
Úprava hlavnej funkcie
Samozrejme, musíme upraviť aj hlavnú funkciu:
func main() {
animalChan := make(chan Animal)
go streamWork(animalChan)
total := distributeWork(animalChan) (1)
log.Printf("[T] %d\t%s\n", total, "Total") (2)
}
1 | Jednoducho rozdelíme robotu a získame výsledok. |
2 | Výsledok vypíšeme. |
Zhrnutie
To je všetko, robota sa veselo rozdeľuje!
Ak odstránime čakanie vo funkcii worker
, uvidíme priame a rýchle rátanie:
2023/01/15 10:57:41 [1] 12 slon
2023/01/15 10:57:41 [1] 2 žirafa
2023/01/15 10:57:41 [0] 4 nosorožec
2023/01/15 10:57:41 [2] 4 hroch
2023/01/15 10:57:41 [2] 2 bizón
2023/01/15 10:57:41 [1] 190 veľryba
2023/01/15 10:57:41 [T] 214 Total
Vidíme, ako sa dáta tlačia do robotníkov: v tomto prípade prvý vybavil nosorožca, druhý odvážil slona, žirafu a veľrybu a posledný stihol hrocha a bizóna.
Bonus: Priamy výpis výsledkov
Ak chceme len priamy výpis výsledkov, kód sa zjednoduší.
Z agregácie je len výpis
Z agregácie stačí robiť výpis:
func processResults(resultChan <-chan int) {
for r := range resultChan {
log.Printf("[R] %d\n", r)
}
}
Distribúcia potrebuje výstupný kanál
Distribúcia výsledkov potrebuje niekoľko zmien:
func distributeWork(animalChan <-chan Animal, weights chan<- int) { (1)
const workerCount = 3
var wg sync.WaitGroup
wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go func(workerId int) {
worker(workerId, animalChan, weights)
wg.Done()
}(i)
}
wg.Wait() (2)
close(weights) (3)
}
1 | Žiadame aj výstupný kanál. |
2 | Čakáme na dobehnutie korutín. |
3 | Ak všetky korutiny dobehli, zatvárame výstupný kanál. |
Zatváranie a čakanie už nemusíme robiť v samostatnej korutine. Konzumovať budeme z inej korutiny, ktorá bude blokovať pri čítaní výsledkov. Deadlock nenastane, keďže |
Upravená funkcia main
Funkcia main
spustí trojicu aktérov:
-
zápis vstupných údajov pobeží v gorutine
-
rozdelenie roboty pobeží v samostatnej gorutine
-
čakanie na výsledky pobeží v hlavnej gorutine, kde súčasne blokuje predčasné ukončenie programu a zároveň blokuje pri čítaní z výsledkov.
func main() {
animalChan := make(chan Animal) (1)
weightChan := make(chan int)
go streamWork(animalChan)
go distributeWork(animalChan, weightChan) (2)
processResults(weightChan) (3)
}
1 | Vytvoríme kanál pre výsledky. |
2 | Pošleme ho do rozdeľovania roboty. |
3 | Spracovávame výsledky, kde blokujeme. |
Nezabudnime skontrolovať, kedy sa zatvorí výstupný kanál, ktorý prechádzame vo funkcii To sa stane po dobehnutí všetkých robotníckych gorutín, čo znamená koniec vstupných dát a teda koniec výsledkov. |