0xDEADBEEF

RSS odkazy
««« »»»

Běží obě strany unixové pipe paralelně?

11. 4. 2022, aktualizováno: 17. 5. 2022

Představte si situaci, že jste v BASHi spustili tohle:

zstdcat data.zst | ./consume

Otázka zní: Budou oba programy běžet paralelně?

Technicky vzato ano. Oba procesy existují v jednu chvíli a pokud máme víc než jedno jádro, mohou najednou i běžet.

Ale jak jsou na tom prakticky? Budou dva procesy zároveň dělat užitečnou práci? To záleží na chování pipe/trubky, která je spojuje.

Představte si situaci, kdy zstdcat má interní buffer, řekněme 1 MB, do nějž dekomprimuje data. Když se tento naplní, začne je zapisovat do standardního výstupu. Protože (řekněme) druhý proces zpracovává data stejnou rychlostí, jako je zstd dekomprimoval, interní buffer trubky (na současném linuxu 64 kB ve výchozím stavu) se okamžitě zaplní a je vyprazdňován rychlostí konzumenta. Pokud je zstd takto implementován, musí čekat, než konzument buffer vyčistí, aby mohl zapsat všechna data a teprve pak začne dekomprimovat další. Systém se tak nachází ve dvou stavech: buď zstd pracuje a konzument stojí, protože nemá data nebo konzument zpracovává data a zstd stojí, protože je trubka plná. Jediná paralelní práce nastává během přesahu na krajích těchto fází, kdy zstd zapsal poslední bajt, začíná zase dekomprimovat, ale buffer trubky poskytuje stále nějakou práci pro konzumenta. V tomhle případě běží paralelně 64k/1M = 6.25% času.

Nevím přesně jak je zstd implementovaný, ale měření ukazuje, že to bude asi tak nějak.

time { zstdcat data.zst | wc -c; }

real  0m3,282s
user  0m2,670s
sys   0m1,142s

Mě zajímá hlavně položka real (čas od začátku do konce). Tady 3.3 s.

time { zstdcat data.zst | ./consume; }

real  0m5,799s
user  0m7,218s
sys   0m1,223s

Teď celkem 5.8 vteřiny. Program ./consume je v tomto případě jednoduchý JSON lexer1 , který bez problémů zvládá 1.2 GB/s na prastarém Sandy Bridge CPU.


Nejjednodušším řešením pro zvýšení paralelismu je zvětšit buffer trubky. (Ukázka v jazyce D.)

import core.sys.posix.fcntl;
fcntl(stdin.fileno, /*F_SETPIPE_SZ*/ 1024 + 7, 1048576);

Maximum, které může nastavit neprivilegovaný uživatel je uvedeno v /proc/sys/fs/pipe-max-size. U mě jeden megabajt.

Pak to vypadá takhle:

time { zstdcat data.zst | ./consume; }

real  0m3,333s
user  0m5,491s
sys   0m0,963s

Tohle je téměř stejně rychlé jako když se výsledek pumpuje do prostého wc, které nedělá téměř nic, a je vidět, že práce probíhá paralelně: real = 3.3 s, user + sys = 6.45 s, téměř 2× tolik.

Na otázku proč je user time jako míra práce provedená oběma procesy v userspace (bez času čekání na blokující syscally) teď menší, odpověď neznám. Nicméně perf u verze s velkým F_SETPIPE_SZ hlásí stejný počet instrukcí, ale drasticky menší počet událostí context-switchescpu-migrations (136k vs. 44k a 375 vs 94).

Program consume čte stdin přes foreach (line; stdin.byLine) {}, to je pod kapotou implementováno pomocí getdelim a to jako součást céčkovského stream I/O, čte data ve výchozím stavu po 4 kB blocích (potvrzeno via strace). Šátrání ve zdrojácích glibc napovídá, že velikost čtení je rovna velikosti interního userspace bufferu daného streamu. Ten se dá taky snadno změnit.2

auto len = 64 * 1024;
auto buff = malloc(len)[0 .. len];
stdin.setvbuf(buff);

Ke zrychlení dojde, i když není tak velké jako změna velikosti kernelového bufferu (real = 3.8 s). Bez té změny je stále limitován na 64 kB dat v jedné read operaci. Na druhou stranu tohle lépe amortizuje náklady syscallů a zdvojí efektivní velikost bufferu. (Konzument má ve svém userspace bufferu 64 kB dat a dalších 64 kB čeká v kernelovém bufferu.)

Kombinace obojího (F_SETPIPE_SZ + setvbuf) pak dále o něco málo stlačí výsledný čas blíže ke 3 vteřinám.


Dalším postupem může být přepnutí trubky do neblokujícího režimu.

auto fl = fcntl(stdin.fileno, F_GETFL);
fcntl(stdin.fileno, F_SETFL, fl | O_NONBLOCK);

Teď, pokud by read nebo write syscall blokoval z důvodu nedostatku místa nebo prázdné trubky, neblokuje a namísto toho okamžitě vrátí EAGAIN.

S tímhle je možné napsat program tak, aby se agresivně snažil plnit buffer trubky a když je plná a dostane EAGAIN, přestane plnit a začne dělat jinou užitečnou práci. Nedochází k dichotomii, kdy buď pracuje nebo zablokovaný čeká na zápis a i s malým bufferem se dá dosáhnout dobrého paralelismu.

Na druhou stranu, když stačí zvednout velikost jednoho nebo druhého bufferu, proč se vůbec snažit s komplikovaným schématem, které je náchylné k chybám.

Pro realističtější představu: S uvedený změnami program, který přes trubku četl JSON data z zstdcat a prováděl nad nimi jednpoduché agregace, zrychlil z 500-600 MB/s na 800-900 MB/s.


K tématu:

How fast are Linux pipes anyway?


  1. Provizorně lexeru říkám mwjlmostly wrong json library. Je z větší části chybný, protože předpokládá, že vstupní JSON je nejen validní, ale zároveň má i určitý specifický formát. Jmenovitě stringy obsahují minimum escapování (jen \\ a \") a unicode znaky jsou uloženy přímo, ne v formě \u sekvencí. To dramaticky zjednoduší dekódovací logiku do té míry, že jde o jeden switch a jednoduché skenování. Knihovna neprovádí žádnou validaci, nikdy neindikuje, že došlo k chybě a vše, čemu nerozumí, jednoduše ignoruje. Vypadá to jako docela drastické omezení použitelnosti, ale pokud vím odkud JSON přišel a jak byl vyprodukován, může tohle stačit. Nehledě na to, že když potřebuji jen několik věcí, můžu lex předčasně ukončit a dál práci zrychlit.
  2. Tady je nutné použít malloc nebo si dávat velice dobrý pozor při použití paměti spravované GC. setvbuf předává pointer internímu C API, do nějž GC nevidí a neví, že na blok paměti vede jedna reference. Když si člověk nedá pozor, GC usoudí, že jde o mrtvou paměť (reference není použitá potom, co se pošle do setvbuf), zrecykluje ji a najednou je v programu kritická chyba. Proto se nevyplatí riskovat a použít malloc.
píše k47 (@kaja47, k47)