Recepty pre gorutiny a kanály – worker pool

2023/01/15

Minule sme vážili zvieratá na viacerých gorutinách. Ukážme si situáciu, keď máme worker pool, teda bank konkrétneho počtu gorutín, ktoré si vyberajú robotu zo spoločného frontu, spracovávajú a výsledky posielajú do nového spoločného frontu.

Začnime opäť vážiť zvieratá.

Kostra

V kostre si len vytvorme kanál pre zvieratá, do ktorého budeme posielať jedince súce na váženie.

Rovno povedzme, že toto skončí deadlockom.

package main

func streamWork(animalChan chan<- Animal) {
	for _, animal := range animals {
		animalChan <- animal (2)
	}
	close(animalChan) (3)
}

func main() {
	animalChan := make(chan Animal) (1)
	streamWork(animalChan) (4)
}

type Animal struct {
	species string
	weight  int
}

var animals = []Animal{
	Animal{"slon", 12},
	Animal{"hroch", 4},
	Animal{"nosorožec", 4},
	Animal{"žirafa", 2},
	Animal{"bizón", 2},
	Animal{"veľryba", 190},
}
1 Nebufferovaný kanál, z ktorého si budú neskôr pracujúce gorutiny vyberať úlohy.
2 Pošleme doňho zvieratá.
3 Na konci kanál zavrieme.
4 Zavoláme funkciu, ktorá pošle údaje so zvieratami do komunikačného nebufferovaného kanála.

Toto je samozrejme deadlock: z kanála nik nečíta a program obratom skončí.

Distribúcia práce

Distribuujme teraz prácu cez „manažéra“, ktorý bude čítať z kanála úloh a rozdeľovať prácu.

Na začiatku ju rozdelí sám sebe.

func distributeWork(animalChan <-chan Animal) { (1)
	for animal := range animalChan {
		log.Printf("%v\n", animal)
	}
}
1 Čítame z kanála, kým sa kanál neuzavrie a zvieratá len vypisujeme.

Použitie v kóde? Dôležité je predísť deadlocku.

Keďže máme synchrónny nebufferovaný kanál, nemôžeme najprv posielať zvieratá na váženie a potom čakať na úlohy. To by bol jasný deadlock — producent by nemal konzumenta.

Nemôžeme to ani urobiť naopak, pretože by manažér-konzument čakal na producenta, ktorý by sa spustil až neskôr.

Rozseknime go-rdický uzol a spustime jednu z týchto súčiastok v gorutine — a zvoľme si za ňu funkciu, ktorá zaradí údaje do kanála.

func main() {
	animalChan := make(chan Animal)

	go streamWork(animalChan) (1)
	distributeWork(animalChan) (2)
}
1 V gorutine pustíme „prúd“ zvierat, ktoré sa ocitnú v komunikačnom kanáli.
2 Začneme distribuovať prácu.

Distribúcia práce vo funkcii distributeWork blokuje hlavnú gorutinu (a bráni predčasnému ukončeniu programu).

Distribúcia práce

  • priebežne čaká na zápis jednotlivých zvierat do animalChan

  • zároveň očakáva koniec údajov v kanáli

Deadlock však nenastáva — jednak pre komunikáciu medzi dvoma korutinami a zároveň kvôli jasnému explicitnému uzatvoreniu kanála.

Robotníci pre tri gorutiny vo Worker Pool.

Začnime naozaj rozdeľovať prácu, a nielen simulovať!

Vytvorme si samostatnú funkciu pre robotnícku triedu, pardón, pre robotnícku funkciu.

Tá bude veľmi pomaly vážiť veľmi veľké zvieratá.

func worker(workerId int, animalChan <-chan Animal) { (1)
	for a := range animalChan { (2)
		time.Sleep(1 * time.Second)
		log.Printf("[%d] %d\t%s\n", workerId, a.weight, a.species) (3)
	}
}
1 Funkcia zoberie identifikátor (pre ladiace účely) a kanál, z ktorého bude načítavať zvieratá.
2 Zvieratá berieme z komunikačného kanála, kým sa kanál nezavrie.
3 Vážime a vypisujeme.
Kto zatvára kanál? Producent, teda funkcia streamWork. To je dôležitá obrana proti deadlocku.

Upravme aj manažment:

func distributeWork(animalChan <-chan Animal) {
	const workerCount = 3
	for i := 0; i < workerCount; i++ {
		go worker(i, animalChan) (1)
	}
}
1 Spusťme tri — nie viac, nie menej — gorutiny. Pošlime im vstupný kanál.
Gorutiny budú súperiť o zvieratá v komunikačnom kanáli.

Spusťme firmu na zvieratá

Ak teraz spustíme main, neuvidíme nič. Povaha distributeWork sa zmenila — už len vystrelí salvu gorutín, tak ako v predošlých životných situáciách, a okamžite skončí.

Keď to chceme urobiť hnusne, budeme čakať.

func main() {
	animalChan := make(chan Animal)

	go streamWork(animalChan)
	distributeWork(animalChan)

	time.Sleep(3 * time.Second) (1)
}
1 Hnusne čakáme.

Výstup pošle dve salvy za sekundu. Sekundu totiž trvá, kým každý worker spracuje výsledok.

2023/01/14 20:08:09 [2] 4       nosorožec
2023/01/14 20:08:09 [0] 12      slon
2023/01/14 20:08:09 [1] 4       hroch
2023/01/14 20:08:10 [2] 2       žirafa
2023/01/14 20:08:10 [1] 190     veľryba
2023/01/14 20:08:10 [0] 2       bizón

Čakanie je však škaredé.

Ak chceme čakať na dobehnutie manažéra, znamená to čakať na koniec gorutín. Je teda čas na WaitGroup.

func distributeWork(animalChan <-chan Animal) {
	const workerCount = 3

	var wg sync.WaitGroup
	wg.Add(workerCount) (1)
	for i := 0; i < workerCount; i++ {
		go func(workerId int) { (2)
			worker(workerId, animalChan)
			wg.Done() (3)
		}(i) (2)
	}
	wg.Wait() (4)
}
1 Pripravíme si WaitGroup s takým počtom gorutín, koľko máme robotníkov.
2 Spustíme v gorutine robotníka. Použijeme vnorenú funkciu, do ktorej dopravíme identifikátor robotníka cez parameter, pretože využívame premennú i z iterácie.
3 Keď robotník skončí, znížime počítadlo vo WaitGroup.
4 Čakáme, kým dobehnú všetky gorutiny.

Takto už nemusíme škaredo čakať:

func main() {
	animalChan := make(chan Animal)

	go streamWork(animalChan)
	distributeWork(animalChan)
}

Program skončí zhruba po 2 sekundách, keď dobehnú obe salvy troch gorutín.

Váženie s výsledkami

Ak chceme, aby gorutiny komunikovali späť, použijeme recepty z predošlých dielov.

  1. Funkcia worker bude posielať výsledky do samostatného kanála.

  2. Funkcia distributeWork bude čakať na výsledky.

  3. A vytvoríme samostatnú funkciu na agregáciu výsledných váh.

Robotníčky odpovedajú do kanála

func worker(workerId int,
            animalChan <-chan Animal,
            weightChan chan<- int) { (1)
	for a := range animalChan {
		time.Sleep(1 * time.Second)
		log.Printf("[%d] %d\t%s\n", workerId, a.weight, a.species)
		weightChan <- a.weight (2)
	}
}
1 Očakávame tretí parameter: kanál pre navážené hmotnosti.
2 Každú hmotnosť zapíšeme do kanála.

Agregácia výsledkov

Agregácia výsledkov načíta údaje a zlúči:

func aggregateResults(weights <-chan int) int {
	total := 0
	for weight := range weights {
		total += weight
	}
	return total
}

Distribúcia roboty prekopaná

Musíme prekopať distribúciu práce:

  1. Zavedieme výstupný kanál.

  2. Počkáme na výsledky.

  3. Agregujeme.

func distributeWork(animalChan <-chan Animal) int {
	const workerCount = 3

	weights := make(chan int) (1)
	var wg sync.WaitGroup
	wg.Add(workerCount)
	for i := 0; i < workerCount; i++ {
		go func(workerId int) {
			worker(workerId, animalChan, weights) (2)
			wg.Done()
		}(i)
	}
	go func() { (3)
		wg.Wait()
		close(weights)
	}()

	return aggregateResults(weights) (4)
}
1 Vytvoríme kanál pre výstupné výsledky.
2 Robotnícka gorutina dostane výstupný kanál ako argument.
3 Čakáme na výsledky v samostatnej gorutine. Toto je presne vzor z predošlého dielu.
4 Ak dôjdu všetky dáta, agregujeme a výsledok vrátime z funkcie.

Úprava hlavnej funkcie

Samozrejme, musíme upraviť aj hlavnú funkciu:

func main() {
	animalChan := make(chan Animal)

	go streamWork(animalChan)
	total := distributeWork(animalChan) (1)

	log.Printf("[T] %d\t%s\n", total, "Total") (2)
}
1 Jednoducho rozdelíme robotu a získame výsledok.
2 Výsledok vypíšeme.

Zhrnutie

To je všetko, robota sa veselo rozdeľuje!

Ak odstránime čakanie vo funkcii worker, uvidíme priame a rýchle rátanie:

2023/01/15 10:57:41 [1] 12      slon
2023/01/15 10:57:41 [1] 2       žirafa
2023/01/15 10:57:41 [0] 4       nosorožec
2023/01/15 10:57:41 [2] 4       hroch
2023/01/15 10:57:41 [2] 2       bizón
2023/01/15 10:57:41 [1] 190     veľryba
2023/01/15 10:57:41 [T] 214     Total

Vidíme, ako sa dáta tlačia do robotníkov: v tomto prípade prvý vybavil nosorožca, druhý odvážil slona, žirafu a veľrybu a posledný stihol hrocha a bizóna.

Bonus: Priamy výpis výsledkov

Ak chceme len priamy výpis výsledkov, kód sa zjednoduší.

Z agregácie je len výpis

Z agregácie stačí robiť výpis:

func processResults(resultChan <-chan int) {
	for r := range resultChan {
		log.Printf("[R] %d\n", r)
	}
}

Distribúcia potrebuje výstupný kanál

Distribúcia výsledkov potrebuje niekoľko zmien:

func distributeWork(animalChan <-chan Animal, weights chan<- int) { (1)
	const workerCount = 3

	var wg sync.WaitGroup
	wg.Add(workerCount)
	for i := 0; i < workerCount; i++ {
		go func(workerId int) {
			worker(workerId, animalChan, weights)
			wg.Done()
		}(i)
	}
	wg.Wait() (2)
	close(weights) (3)
}
1 Žiadame aj výstupný kanál.
2 Čakáme na dobehnutie korutín.
3 Ak všetky korutiny dobehli, zatvárame výstupný kanál.

Zatváranie a čakanie už nemusíme robiť v samostatnej korutine. Konzumovať budeme z inej korutiny, ktorá bude blokovať pri čítaní výsledkov.

Deadlock nenastane, keďže distributeWork a čítanie z výsledkov pobežia v odlišných gorutinách.

Upravená funkcia main

Funkcia main spustí trojicu aktérov:

  • zápis vstupných údajov pobeží v gorutine

  • rozdelenie roboty pobeží v samostatnej gorutine

  • čakanie na výsledky pobeží v hlavnej gorutine, kde súčasne blokuje predčasné ukončenie programu a zároveň blokuje pri čítaní z výsledkov.

func main() {
	animalChan := make(chan Animal) (1)
	weightChan := make(chan int)

	go streamWork(animalChan)
	go distributeWork(animalChan, weightChan) (2)
	processResults(weightChan) (3)
}
1 Vytvoríme kanál pre výsledky.
2 Pošleme ho do rozdeľovania roboty.
3 Spracovávame výsledky, kde blokujeme.

Nezabudnime skontrolovať, kedy sa zatvorí výstupný kanál, ktorý prechádzame vo funkcii processResult.

To sa stane po dobehnutí všetkých robotníckych gorutín, čo znamená koniec vstupných dát a teda koniec výsledkov.

>> Home