Przetwarzanie równoległe

W Julii mamy cztery typy obliczeń równoległych:

  1. Asynchroniczne zadania (tasks) - które mogą być zawieszane, przełączane, oczekiwać zakończenia przez inne zadania itp. Ściśle rzecz biorąc nie są to obliczenia równoległe, ale są takie kiedy zadania są wykonywane przez różne wątki

  2. Obliczenia wielowątkowe - zadania uruchamiane na wielu wątkach, na wielu procesorach, ale na tej samej maszynie, dzielące wspólną pamięć RAM

  3. Obliczenia rozproszone - obliczenia wykonywane na oddzielnych przestrzeniach pamięci na wielu procesorach lub komputerach

  4. Obliczenia na GPU

Na zajęciach zajmiemy się przypadkami 1, 2 i 3. Obliczenia na GPU wymagają pewnych przygotowań oraz przede wszystkim posiadania odpowiedniego GPU (którego nie możemy zapewnić) zaintersowanych odsyłam tu.

Zadania (tasks)

Kawałek kodu (zwykle funkcja) może zostać zadaniem (task). W odróżnieniu od zwykłych funkcji mamy swobodę przełączania się między zadaniami w dowolnej kolejności. Typowym zastosowaniem jest np. komunikacja z zewnętrzem, czekanie na transfer danych itp. podczas którego możemy wykonywać inne czynności i przerwać je gdy zajdzie potrzeba.

Zadanie tworzymy za pomocą makra @task, następujące wyrażenie (funkcja, itp.) zostanie zadaniem, ale nie zostanie uruchomione.

y = rand(1000)
t = @task begin; sleep(2); println("t: ", length(filter(x -> x > 0.5, y))); end

Zadanie uruchamiamy poleceniem schedule, zostanie ono uruchomione i program przejdzie do następnych poleceń. Jeżeli chemy od razu stworzyć i uruchomić zadanie możemy użyć polecenia @async.

schedule(t)
println("main: ", length(filter(x -> x > 0.5, y)))
@async begin; sleep(2); println("t2: done"); end

Jeżeli chcemy w którymś momencie zaczekać na zakończenie zadania, możemy użyć polecenia wait(t).

Przykład

Wyobraźmy sobie, że checmy sprawdzić jakie komputery odpowiadają na wywołanie w podsieci x.x.x.x/24, czyli np. 10.13.2.1-254. Użyjemy systemowego polecenia ping, których będziemy próbować wywołać komputer tylko jeden raz i czekać na odpowiedź co najwyżej 5 sekund. Polecenia zewnętrzne w Julii (podobnie jak w shellu) otaczane są odwrotnymi pojedynczymi cudczysłowami `. Polecenie można uruchomić funkcją run, albo jeżeli chcemy poznać odpowiedź read.

read(`ping -c 1 -w 5 10.19.16.2`, String)

W przypadku gdy dany adres nie odpowiada otrzymamy błąd, bo proces zwraca wynik różny niż zero.

read(`ping -c 1 -w 5 10.19.2.220`, String)

Skanowanie sieci kolejnymi wywołaniami może trwać długo - jeżeli na każdą odpowiedź czekamy maksymalnie 5 sekund i dopiero wtedy wysyłamy następny ping, to łącznie może to trwać ponad 20 minut (254 * 5 sekund). Możemy jednak uruchomić równolegle wiele procesów. Na koniec poczekamy aż wszystkie zadania skończą działanie i wyświetlimy listę adresów IP, które odpowiedziały.

function ping(ip)
    try
        s = read(`ping -c 1 -w 5 $ip`, String)
        return s
    catch err
        return ""
    end
end
N = 254
ip_tab = Array{String, 1}(undef, N)
tasks = Array{Task, 1}()
for i in 1:N
    push!(tasks, @async ip_tab[i] = ping("10.19.16.$i"))
end

for i in 1:N
    wait(tasks[i])
    if length(ip_tab[i]) > 0
        println("10.19.16.$i")
    end
end

Podobny efekt możemy osiągnąć używając funkcji asyncmap, która jest odpowiednikiem map, ale zamiast uruchamiać funckje dla każdego elementu kolekcji, uruchomi zadania

asyncmap(x->ping(x), 1:N);

Kanały (Channel)

Kanał to kolejka FIFO o zadanej długości (lub "nieskończona"), której mogą używać różne zadania do komunikacji i wymiany danych.

  • Channel{T}(n) - typ T, długość n; domyślnie Any, 0 - kolejka bez bufora, długość Inf oznacza typemax(Int)

  • put! dodaje element do kolejki

  • take! zabiera element z kolejki

  • fetch czyta element z kolejki (bez zabierania)

  • jeżeli kanał jest pusty funkcja take! czeka aż pojawią się dane

  • jeżeli kanał jest pełny funkcja put! czeka aż pojawi się miejsce

  • isready sprawdza obecność jakiegokolwiek elementu w kolejce

  • wait czeka na pojawienie się elementu

using GRUtils
using StatsBase

function worker(id, data, stop)
    while true
        if isready(stop)
            break
        end
        sleep(0.1)
        put!(data, id)
    end
    println("W-", id, " done")
end

sigstop = Channel{Bool}(1)
times = Channel{Int64}(Inf)
tasks = Array{Task, 1}()

N = 4

println("Start")

for i in 1:N
    push!(tasks, @async worker(i, times, sigstop))
end

sleep(5)
println("Stop send")
put!(sigstop, true)

for i in 1:N
    wait(tasks[i])
end

h = fit(Histogram, times.data, 1:1:N+1)
barplot(h.weights)
println(length(times.data))
display(gcf())

W ciągu 5 sekund każde z zadań dodało 50 elementów do kolejki z danymi (zgodne ze sleep(0.1)), łącznie 200 wpisów.

Spróbujmy operacji, która angażuje w większym stopniu CPU. Każdy z wątków będzie miał teraz za zadanie generowanie liczb losowych. Sprawdźmy ile czasu zajmuje wylosowanie 10 milionów liczb:

@time for i in 1:10_000_000
    rand()
end

Mniej więcej 0.1 sekundy. Możemy zatem każdemu z zadań kazać losować liczby.

Gdybyśmy wstawili powyższy kod zamiast polecenia sleep okazałoby się, że mamy bardzo obciążony procesor i nie możemy przerwać procesu bez zabicia go.

Dlaczego kod nie działa jak byśmy się spodziewali? Jeżeli sprawdzimy, które zadania wpisują wartości do kanału times, okaże się, że będzie to tylko zadanie 1. Zabiera ono cały czas procesora i ani kolejne zadania, ani nawet dalsza część programu nie jest dopuszczona do głosu. Dopiero dodanie polecenia sleep() po pętli losującej pozwala przełączyć zadania. Ale przecież w programowaniu równoległym nie o to chodzi!

function worker(id, data, stop)
    while true
        if isready(stop)
            break
        end
        for i in 1:10_000_000
            rand()
        end
        # To naprawia kod!
        sleep(0.001)
        put!(data, id)
    end
    println("W-", id, " done")
end

sigstop = Channel{Bool}(1)
times = Channel{Int64}(Inf)
tasks = Array{Task, 1}()

N = 4

println("Start")

for i in 1:N
    push!(tasks, @async worker(i, times, sigstop))
end

sleep(5)
println("Stop send")
put!(sigstop, true)

for i in 1:N
    wait(tasks[i])
end

h = fit(Histogram, times.data, 1:1:N+1)
barplot(h.weights)
println(length(times.data))
display(gcf())

Wątki

Przedstawione zadania nie są równoległe, bo używają pojedynczego wątku i wspólnej pamięci. Dlatego ich użycie nie przyspiesza np. obliczeń numerycznych, i są stosowane przede wszystkim do zadań I/O, aby nie blokować wykonywania innych części programu.

Prawdziwą równoległość umożliwiają wątki (threads). Domyślnie Julia startuje tylko z jednym wątkiem

Threads.nthreads()

Ale uruchomiona z flagą -t/–threads może mieć ich więcej

$ julia --threads 8

W przypadku notatnika Jupyter trzeba zdefiniować nowy rdzeń np. z 8 wątkami (pliki konfiguracyjne pojawiają się w .local/share/jupyter/kernels). Mój komputer ma teoretycznie 8 wątki (ale 4 rdzenie CPU).

julia> using IJulia
julia> IJulia.installkernel("Julia 8 Threads", env=Dict("JULIA_NUM_THREADS" => "8",))

Używanie wątków, oprócz oczywistych korzyści, nakłada także zobowiązania takie jak zapewnienie, że nie występuje wyścig do danych (data-race) potencjalne zakleszczenie (deadlock), zagłodzenie (starvation), oraz czytanie danych, które mogą być manipulowane przez wiele wątków jest bezpieczne.

Aby zapewnić brak negatywnych zjawisk istnieje kilka mechanizów - blokady oraz operacje atomistyczne.

Do ilustracji wyścigu użyjemy jednego z najprostszych mechanizów wielowąkowych - makra @threads, które automatycznie zrównolegla pętle for. W przykładzie używamy tablicy jednoelementowej ze względu na sposób przekazywania przez wskaźnik.

# Wersja jednowątkowa
s = zeros(Int64, 1)
for i in 1:1_000_000
    s[1] += 1
end
println(s)

W przypadku prostej pętli nie mamy żadnego problemu z oczywistem wynikiem dodawania jedynki milion razy

# Wielowątkowa
s = zeros(Int64, 1)
Threads.@threads for i in 1:1_000_000
    s[1] += 1
end
println(s)

Naiwna wersja wielowątkowa po prostu dodaje makro przed pętlą for, ale nie dba o wystąpienie zjawiska wyścigu. Bierze się on z faktu, iż wątki wykonują operację dodawania w rzeczywistości nie w jednym, a w trzech krokach - pobierz, dodaj, wpisz. W związku z tym pomiędzy pobraniem wartości zmiennej, a wpisaniem nowej wartości, inny wątek może już zmienić jej wartość.

WątekOperacjaWartość x
1pobiera x=00
2pobiera x=00
1dodaje 0+10
2dodaje 0+10
1wpisuje x=11
2wpisuje x=11
# Atomistyczne
s = Threads.Atomic{Int}(0)
Threads.@threads for i in 1:1_000_000
    Threads.atomic_add!(s, 1)
end
println(s.value)

Operacja atomistyczna (jednostkowa) zapewnia, że cała czynność musi być wykonana w jednym kroku (choć na poziomie instrukcji procesora nadal musi być ich kilka). Dzięki temu dodawanie atomistyczne (atomic_add!) daje dobry wynik.

Ten sam efekt można osiągnąć stosując blokadę (lock), która zapewnia, że tylko jeden wątek wykonuje krytyczną operację jednocześnie. Oczywiście w tym prostym przypadku operacja atomistyczna jest prostsza w implementacji, ale mechanizm blokad jest znacznie bardziej elastyczny i w bardziej skomplikowanym przypadku może być niezbędny.

Podstawowy schemat użycia blokady to:

lock(lk)
    try
        # operacja
    finally
        unlock(lk)
    end
function increase(s, x, lk)
    lock(lk)
    try
        s[1] += x
    finally
        unlock(lk)
    end
end

rl = ReentrantLock()
s = zeros(Int64, 1)
Threads.@threads for i in 1:1_000_000
    increase(s, 1, rl)
end
println(s)

Makro @threads działa tylko na pętle for i jest stworzone dla wygody, ponieważ jest to najczęstsze miejsce gdzie tanim kosztem można wprowadzić równoległe przetwarzanie.

W ogólnym przypadku uruchamianie wątków jest analogiczne do uruchamiania zadań. Makro @spawn jest odpowiednikiem @async. Pozostałe mechanizmy (wait, channel, itp.) można używać jak w programowaniu asynchronicznym. Różnicą jest oczywiście uruchamianie wątków na osobnych wątkach CPU i lepsze wykorzystanie mocy obliczeniowych komputera.

Spróbujmy teraz jaki jest zysk z używania od 1 do 4 wątków (uwaga: zbyt duża liczba w stosunku do prawdziwych rdzeni może spowodować ten sam problem co wcześniej z zadaniami, w takim przypadku trzeba wyłączyć rdzeń Kernel-Shutdown !).

using GRUtils
using StatsBase

function worker(id, data, stop)
    while true
        if isready(stop)
            break
        end
        for i in 1:10_000_000
            rand()
        end
        put!(data, id)
    end
    println("W-", id, " done")
end

sigstop = Channel{Bool}(1)
times = Channel{Int64}(Inf)
tasks = Array{Task, 1}()
N = 4

println("Start")

for i in 1:N
    push!(tasks, Threads.@spawn worker(i, times, sigstop))
    println(" added W-", i)
end

sleep(5)
println("Stop send")
put!(sigstop, true)

for i in 1:N
    wait(tasks[i])
end

h = fit(Histogram, times.data, 1:1:N+1)
barplot(h.weights)
println(length(times.data))
display(gcf())

Przetwarzanie wieloprocesorowe i rozproszone

Pełne wykorzystanie zasobów współczesnych komputerów posiadających wiele procesorów, czy klastrów komputerów lub wręcz wykorzystanie rozproszonych fizycznie komputerów wymaga jeszcze innego podejścia, w którym nie tylko CPU, ale i pamięć RAM jest rozdzielona pomiędzy procesory. Rodzi to dodatkowe problemy w programowaniu i zasadniczo różni się ono od tego co widzieliśmy dotychczas.

Aby uruchomić Julię korzystając z wielu procesów podajemy parametr -p n, co automatycznie ładuje bibliotekę Distributed i n dodatkowych procesów. Oczywiście sensownie jest ustalić tę wielkość na nie więcej niż liczba fizycznych rdzeni procesora. Każdy proces ma swój indetyfikator zaczynający się od 1 (jeżeli uruchomiona jest interaktywna sesja, to zawsze ma numer 1). Numer można sprawdzić przez myid(), a listę wszystkich "robotników" (workers - procesy poza głównym) przez workers(). Dodawać procesy można także ładując bibliotekę i używając funkcji addprocs.

Uwaga! W tym miejscu najlepiej zrestartować notatnik po wcześniejszych testach z wątkami. Każdy proces może mieć bowiem kilka wątków i możemy ich mieć już za dużo!

using Distributed

println("myid: ", myid())
println(workers())
println(nprocs())
println(nworkers())

addprocs(2)
println(workers())
println(nprocs())
println(nworkers())

Podstawową niskopoziomową strukturą jest uruchomienie wyrażenia (funkcji, itp.) na procesie n poprzez remotecall. Zwraca ono obiekt Future, który jest zastępczym obiektem dla wyniku obliczeń, które mają nieznany stan i czas zakończenia.

remotecall(funkcja, worker_id, argumenty...)

Funkcja fetch czeka na zakończenie procesu i czyta zwracaną wartość.

r = remotecall(rand, 2, Int64, 10)
fetch(r)

Makro @spawnat p wyrażenie uruchamia wyrażenie na procesie p, zamiast numeru można użyć :any i wtedy proces zostanie wybrany automatycznie.

r = @spawnat :any rand(10)
fetch(r)

Kod który uruchamiamy musi być dostępny dla wszystkich procesów. Jeżeli zdefinujemy np. jakąś funkcję w sesji interaktywnej (proces 1), to inne procesy nie będą jej znały

function f()
    rand(10)
end

r = @spawnat 2 f()
fetch(r)

Makro @everywhere pozwala na załadowanie wyrażenia na wszystkich procesach

@everywhere function f()
                rand(10)
end
r = @spawnat 2 f()
fetch(r)

Ponieważ każdy proces ma swoją pamięć RAM, czasami konieczne jest przesyłanie danych pomiędzy procesami. Jest to zwykle wąskie gardło obliczeń równoległych i powinno być minimalizowane. Najlepiej jest tak zorganizować algorytm czy kod, aby zwracał on wynik obliczeń i minimalizował przesyłanie danych.

Odpowiednikiem kanałów jest RemoteChannel. Inne struktury też mają swoje odpowiedniki np. SharedArrays. Użyjemy tych dwóch struktur w przykładzie poniżej.

Przykład

Spróbujemy zbudować następujący wieloprocesorowy program, w którym dane będą generowane dane przez niezależne procesy, a trzeci proces będzie zajęty rysowaniem wykresu (histogramu). Jeden generator będzie losował szum, drugi rozkład normalny. Obydwa będą wpisywały wyniki do współdzielonej tablicy. Użyjemy podobnego mechanizmu zatrzymywania co wcześniej, poprzez wysłanie sygnału do wszystkich procesów.

using Distributed
using SharedArrays
using GRUtils

addprocs(2)

@everywhere using Distributions
nprocs()

Generator szumu będzie produkował dane z rozkładu płaskiego w całym zakresie histogramu. Zwróci na koniec liczbę punktów, którą wpisał do histogramu (będziemy mogli sprawdzić czy nie straciliśmy danych).

@everywhere function generator_noise!(sigstop, histogram)
    println("Starting noise generator ", myid())
    nmax = length(histogram)
    n = 0
    while true
        if isready(sigstop)
            break
        end
        i = round(Int64, rand() * nmax, RoundUp)
        if 0 < i <= nmax
            histogram[i] += 1
            n += 1
        end
    end
    println("Noise generator done")
    n
end

Drugi generator będzie produkował dane z rozkładu normalnego

@everywhere function generator_peak!(sigstop, histogram)
    println("Starting peak generator ", myid())
    nmax = length(histogram)
    normal = Normal(nmax / 2, 100.0)
    n = 0
    while true
        if isready(sigstop)
            break
        end
        i = round(Int64, rand(normal), RoundUp)
        if 0 < i <= nmax
            histogram[i] += 1
            n += 1
        end
    end
    println("Peak generator done")
    n
end

Potrzebujemy także funkcji rysującej bieżący histogram. Oprócz danych i sygnałów stopu zdefinujemy co ile sekund wykres ma się odświeżać.

function plotter(sigstop, histogram, fig, update)
    println("Starting plotter ", myid())

    gcf(fig)
    while true
        if isready(sigstop)
            break
        end
        stairs(histogram)
        xlim(0, 16384)
        xticks(1000, 5)
        ylim(0, maximum(histogram) + 5)
        display(gcf())
        sleep(update)
    end
    println("Plotter done")
end

Zdefinujemy teraz sygnał stopujący (RemoteChannel), histogram (SharedArray) i wykres. Funckja rysująca będzie zadaniem, bo ma odświeżać wykres tylko co jakiś czas. Zmienna trun zdefinuje czas działania w sekundach, po jakim wysłany zostanie sygnał stopujący. W jupyterze niestety wykresy dodają się jeden pod drugim (w REPL-u są odświeżane).

trun = 10

sigstop = RemoteChannel(()->Channel{Bool}(1))
histogram = SharedArray{Int64, 1}(16384)

fig = Figure((800, 600), "pix")
display(gcf())

p = @task plotter(sigstop, histogram, fig, 0.1)
schedule(p)

gn = @spawnat :any generator_noise!(sigstop, histogram)
gp = @spawnat :any generator_peak!(sigstop, histogram)

sleep(trun)
put!(sigstop, true)

sn = fetch(gn)
sp = fetch(gp)
println(sum(histogram), " events in histogram")
println(sn + sp, " events generated")
plot(histogram)

Fizycznie różne komputery

Co jeżeli chcemy kod uruchomić na wielu, fizycznie rozdzielonych komputerach (nie na wielu procesorach jednego komputera). Okazuje się, że powyższy kod będzie działał tak samo. Przy uruchamianiu Julii możemy podać listę maszyn, na których będziemy uruchamiać w sposób rozproszony kod. Dla każdej maszyny kod będzie uruchomiony domyślnie w tej samej ścieżce i dla tego samego użytkownika jak na gospodarzu. Używany jest protokół ssh z logowaniem bez haseł (np. za pomocą kluczy). Po wpisaniu listy maszyn (i ewentualnych opcji) do pliku (np. machines.txt) możemy uruchomić Julię z opcją --machine-file

$ julia [-t n] [-p m] --machine-file machines.txt program.jl

Jak widać każda z maszyn może użwać kilku procesów, a każdy proces kilku wątków!

Plik machines.txt może wyglądać np. tak

10.13.2.220
andrew.zfj.fuw.edu.pl

Wygodnym poleceniem w przetwarzaniu rozproszonym jest pmap(f, c) (równoległa wersja funkcji map). Dla wszystkich elementów kolekcji c (która w zamyśle definiuje problem, fragmenty obliczeń itp.) uruchamia funkcję f (w zamyśle proces obliczeń zwracający wynik), a obliczenia są automatycznie rozdzielane pomiędzy dostępne procesy i maszyny.

CC BY-SA 4.0 Krzysztof Miernik. Last modified: December 04, 2023. Website built with Franklin.jl and the Julia programming language.