Gorutiny a atomické počítadlá

2023/01/06

Počítajme slová vo vetách s gorutinami!

package main

import (
	"log"
	"strings"
)

var input = `A ty mor ho! — hoj mor ho! detvo môjho rodu,
kto kradmou rukou siahne na tvoju slobodu;
a čo i tam dušu dáš v tom boji divokom:
Mor ty len, a voľ nebyť, ako byť otrokom.`

func main() {
	lines := strings.Split(input, "\n")
	for _, line := range lines {
		go countWords(line)
	}
}

func countWords(line string) {
	words := strings.Count(line, " ") + 1
	log.Printf("%d %s", words, line)
	return words
}

Chalupkov fragment najprv rozbijeme na riadky. Každý z nich pošleme do gorutiny, ktorá začne paralelne rátať počet slov a výsledok vypíše do logu.

Gorutina sa spúšťa pomocou slova go a volania funkcie, ktorú chceme spustiť paralelne.
Funkcia bežiaca ako gorutina nemôže vracať výsledok. Museli by sme naňho čakať, čím by sme popreli zmysel paralelne bežiacej funkcie, na ktorej výsledok sa nečaká. To je dôvod, prečo countWords nevracia nič.

Paralelne sa teda spustia štyri gorutiny — pre štyri riadky.

Atomické počítadlá

Keďže každá gorutina „vracia“ len celé číslo, a úlohou je spočítať čísla, stačí nám jedno atomické počítadlo.

Ide o počítadlo, ktoré vieme nastavovať atomicky, teda bezpečne uskutočniť nasledovné operácie:

  1. Zistiť stav počítadla.

  2. Navýšiť o príslušnú hodnotu

  3. Zapísať novú hodnotu.

Toto počítadlo bude zdieľané z viacerých gorutín, a hoci „zdieľané“ je v konkurentnom svete nebezpečné slovo, vďaka atomicite sa všetko vyrieši automaticky.

Prvý nástrel s čakaním

import (
	"fmt"
	"strings"
	"sync/atomic"
	"time"
)

func main() {
	wordCount := new(int32) (1)
	lines := strings.Split(input, "\n")
	for _, line := range lines {
		go func() { (2)
			words := int32(countWords(line)) (3)
			atomic.AddInt32(wordCount, words) (4)
		}()
	}
	// TODO dočasné riešenie!
	time.Sleep(4 * time.Second) (5)
	fmt.Printf("%d", *wordCount) (6)
}
1 Vytvoríme premennú pre počítadlo. Keďže budeme potrebovať premennú typu int32, rovno alokujeme pamäť a získame ju ako pointer.
2 Spustíme anonymnú funkciu ako korutinu.
3 Spočítame počet slov ako int a prevedieme ho na int32. Tieto dva typy sú prakticky synonymá, ale prevod musíme uviesť explicitne.
4 Pomocou funkcie AddInt32 vieme atomicky navýšiť počítadlo reprezentované pointerom na číslo typu int32.
5 Na začiatku urobíme zverstvo — počkáme si štyri sekundy na výsledok. Toto budeme musieť ihneď opraviť!
6 Výsledok vypíšeme.

Po spustení programu ubehnú 4 sekundy, za ktoré — dúfame — dobehnú všetky korutiny — a počítadlo ukáže:

36

No moment, nie je tam náhodou 37 slov?

Je.

Problém je na riadku:

words := int32(countWords(line))

Ak by sme si spustili go vet, ktorý skontroluje podozrivé konštrukcie cez

go vet <program.go>

uvideli by sme

loop variable line captured by func literal

Prostredie GoLand ukáže podobné varovanie:

Loop variables captured by 'func' literals in 'go' statements might have unexpected values

Keďže anonymná funkcia v korutine pristupuje k premennej lines zvonku. Takéto správanie je nepredvídateľné a rieši sa trikom.

for _, line := range lines {
    line := line (1)
    go func() {
        words := int32(countWords(line))  (3)
        atomic.AddInt32(wordCount, words) (4)
    }()
}
1 V cykle urobíme lokálnu premennú s názvom, ktorý prebije premennú cyklu. Túto „novú“ premennú už vieme spracovať korektne.
Dôvodom je fakt, že celý cyklus obvykle zbehne rýchlejšie než dobehnú korutiny a to, že premenné v iteráciách sú na tej istej adrese v pamäti a len sa v každej iterácii mení ich obsah.

Teraz už uvidíme korektný výsledok: 37.

Stále však čakáme!

Čakanie cez WaitGroup

WaitGroup sa dá použiť na vyčkávanie dobehnutia gorutín.

V Jave je ekvivalentom CountdownLatch.

WaitGroup je tiež akési počítadlo s nasledovnými schopnosťami:

  • Add: zvýši interné počítadlo. Používané pri spustení novej gorutiny.

  • Done: gorutina po dobehnutí zníži počítadlo

  • Wait: v hlavnej gorutine čakáme, kým sa počítadlo nezníži na nulu.

Ak odovzdávame premennú typu WaitGroup do funkcie, vždy musíme použiť pointer.
func main() {
	wg := new(sync.WaitGroup) (1)
	wordCount := new(int32)
	lines := strings.Split(input, "\n")
	for _, line := range lines {
		line := line
		wg.Add(1) (2)
		go func() {
			words := int32(countWords(line))
			atomic.AddInt32(wordCount, words)
			wg.Done() (3)
		}()
	}
	wg.Wait() (4)
	fmt.Printf("%d", *wordCount)
}
1 Vyrobíme pointer na WaitGroup.
2 Po spustení gorutiny zvýšime počítadlo.
3 Ak gorutina dobehne, znížime počítadlo.
4 Čakáme na dobehnutie, inak povedané, funkcia main pozastavená dovtedy, kým neodbehnú korutiny.

Ak teraz spustíme program, všetko bude korektné a bez čakania!

Program zráta 53 megabajtový korpus za 67 milisekúnd, pričom na rovnakom stroji je wc -w vykonaný za 182 milisekúnd
Všimnime si, že je možné pustiť obrovské množstvo gorutín — keďže ich réžia je maličká, nie je to problém.

Ako ďalej?

Náš program funguje, ale porušuje filozofiu konkurentnosti v Go:

Nekomunikujte cez zdieľanú pamäť - radšej zdieľajte pamäť komunikáciou.

V zložitejších prípadoch, kde si nevymieňame len čísla, je lepšie použiť kanály (channels).

>> Home