Pendahuluan
Primitif konkurensi pada Go mempermudah kita membangun aliran data pipeline yang menggunakan I/O dan CPU dengan efisien. Artikel ini menampilkan contoh-contoh dari pipeline tersebut, menyoroti kesalahan yang mungkin muncul saat operasi gagal, dan memperkenalkan teknik-teknik untuk berurusan dengan kegagalan secara bersih.
Apa itu pipeline?
Tidak ada definisi formal dari sebuah pipeline dalam Go; ia adalah salah satu dari banyak jenis program yang konkuren. Secara informal, sebuah pipeline adalah suatu urutan tahap-tahap yang dihubungkan oleh kanal, yang mana setiap tahap adalah sebuah grup dari goroutine yang menjalankan fungsi yang sama. Dalam setiap tahap, setiap goroutine
-
menerima nilai dari hulu lewat kanal masuk
-
memroses data lewat fungsi, biasanya menghasilkan nilai baru
-
mengirim nilai ke hilir lewat kanal keluar
Setiap tahap memiliki sejumlah kanal masuk dan keluar, kecuali tahap yang pertama dan terakhir, yang mana hanya memiliki kanal keluar atau masuk, secara berurutan. Tahap yang pertama biasanya disebut dengan sumber atau produser; tahap yang terakhir biasanya disebut dengan sink atau konsumer.
Kita akan memulai dengan sebuah contoh pipeline sederhana untuk menjelaskan ide-ide dan teknik-teknik tersebut. Nanti, kita akan memperlihatkan contoh yang lebih nyata.
Memangkatkan bilangan
Bayangkan sebuah pipeline dengan tiga tahap.
Tahap pertama, gen
, yaitu sebuah fungsi yang mengonversi deretan menjadi
menjadi sebuah kanal.
Fungsi gen
menjalankan sebuah goroutine yang mengirim setiap parameter
integer yang ia terima ke dalam sebuah kanal dan menutup kanal tersebut saat
semua nilai telah dikirim.
func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out }
Tahap kedua, sq
, menerima sejumlah nilai integer dari sebuah kanal dan
mengembalikan sebuah kanal yang berisi pangkat dari setiap integer yang ia
terima.
Setelah kanal masuk ditutup (yang berarti semua nilai integer telah
diterima) dan tahap ini telah mengirim semua nilai pangkat ke hilir, ia akan
menutup kanal keluar.
func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out }
Fungsi main
menyiapkan pipeline dan menjalankan tahap yang terakhir:
menerima nilai dari tahap kedua dan mencetaknya satu per satu, sampai kanal
ditutup.
func main() { // Siapkan pipeline. c := gen(2, 3) out := sq(c) // Konsumsi kanal keluar. fmt.Println(<-out) // 4 fmt.Println(<-out) // 9 }
Secara sq
memiliki tipe kanal yang sama untuk yang masuk dan keluar, kita
dapat menulisnya beberapa kali.
Kita juga dapat menulis fungsi main
sebagai pengulangan dengan range
,
seperti pada tahap-tahap lainnya:
func main() { // Siapkan pipeline dan konsumsi kanal keluar. for n := range sq(sq(gen(2, 3))) { fmt.Println(n) // 16 kemudian 81 } }
Fan-out, fan-in
Beberapa fungsi dapat membaca dari kanal yang sama sampai kanal tersebut ditutup; hal ini disebut dengan fan-out. Cara ini membolehkan distribusi kerja antara sekelompok goroutine supaya penggunaan CPU dan I/O paralel.
Sebuah fungsi dapat membaca dari beberapa input sampai semua input ditutup dengan cara menggabungkan semua kanal input menjadi kanal tunggal yang ditutup saat semua input telah ditutup. Cara ini disebut dengan fan-in.
Kita dapat mengubah pipeline sebelumnya untuk menjalankan dua fungsi sq
,
yang membaca dari kanal input yang sama.
Untuk itu, kita perlu membuat sebuah fungsi baru, merge
, yang menggabungkan
(fan-in) semua hasil dari sq
:
func main() { in := gen(2, 3) // Distribusi pekerjaan lewat dua goroutine `sq` yang membaca `in` yang // sama. c1 := sq(in) c2 := sq(in) // Konsumsi gabungan keluaran dari c1 dan c2. for n := range merge(c1, c2) { fmt.Println(n) // 4 lalu 9, atau 9 lalu 4 } }
Fungsi merge
mengonversi sejumlah kanal menjadi sebuah kanal tunggal dengan
menjalankan sebuah goroutine untuk setiap kanal input dan menyalin nilainya ke
sebuah kanal keluar tunggal.
Saat semua goroutine telah dimulai, fungsi merge
menjalankan lagi sebuah
goroutine untuk menutup kanal keluar saat semua pengiriman ke kanal keluar
tersebut selesai.
Pengiriman ke kanal yang telah ditutup akan menyebabkan panic, jadi sangat penting untuk memastikan semua pengiriman telah selesai sebelum menutup kanal keluar tersebut. Tipe sync.WaitGroup menyediakan cara sederhana untuk mengatur sinkronisasi seperti ini:
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Jalankan goroutine untuk setiap kanal input lewat `cs`. // Fungsi `output` menyalin nilai dari `c` ke `out` sampai `c` ditutup, // terakhir memanggil `wg.Done`. output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } // Jalankan sebuah goroutine untuk menutup kanal keluar saat semua // goroutine `output` telah selesai. // Goroutine ini harus dimulai setelah pemanggilan `wg.Add`. go func() { wg.Wait() close(out) }() return out }
Berhenti dengan segera
Ada semacam pola dari fungsi-fungsi pipeline kita:
-
Setiap tahap menutup kanal keluar saat semua operasi pengiriman selesai.
-
Setiap tahap terus menerima nilai dari kanal masuk sampai kanal tersebut ditutup.
Pola ini membolehkan setiap tahap yang menerima nilai untuk dibuat sebagai
pengulangan range
dan memastikan bahwa semua goroutine selesai saat
semua nilai telah sukses dikirim ke hilir.
Namun, pada pipeline di dunia nyata, setiap tahap tidak selalu menerima semua nilai yang masuk. Terkadang memang dirancang seperti itu: si penerima hanya memerlukan sebagian dari nilai untuk melanjutkan pemrosesan. Sering kali, sebuah tahap selesai lebih awal karena nilai yang masuk merepresentasikan sebuah eror. Untuk setiap kasus tersebut, si penerima seharusnya tidak menunggu sampai semua nilai diterima, dan kita ingin supaya tahap sebelumnya berhenti mengirim nilai yang tahap berikutnya tidak butuhkan.
Pada contoh pipeline kita sebelumnya, jika tahap terakhir gagal mengonsumsi semua nilai yang masuk, maka goroutine yang mengirim nilai ke tahap terakhir akan pampat, misalnya pada contoh kode berikut:
// Konsumsi hanya nilai pertama dari `output`. out := merge(c1, c2) fmt.Println(<-out) // 4 or 9 return // Secara kita tidak mengambil nilai kedua dari `out`, salah satu // goroutine `output` akan pampat saat mencoba mengirim ke kanal. }
Hal ini menyebabkan adanya kebocoran sumber daya: goroutine mengonsumsi sumber daya memori dan runtime, dan referensi heap pada stack goroutine menyebabkan data tidak di-garbage collected. Goroutine tidak di garbage collected; mereka harus selesai dengan sendirinya.
Untuk itu kita perlu mengatur supaya setiap tahap dari hulu pipeline keluar dengan bersih walaupun tahap-tahap di hilir gagal menerima semua nilai yang masuk. Salah satu cara untuk menyelesaikan masalah ini yaitu dengan mengubah kanal keluar supaya memiliki buffer. Sebuah buffer dapat menyimpan sejumlah nilai; operasi pengiriman akan langsung selesai jika ada ruang yang tersedia dalam buffer:
c := make(chan int, 2) // buffer berukuran 2 c <- 1 // langsung sukses. c <- 2 // langsung sukses. c <- 3 // pampat sampai goroutine yang lain melakukan <-c dan menerima 1.
Saat jumlah nilai yang akan dikirim diketahui saat kanal dibuat, maka sebuah
buffer dapat menyederhanakan kode kita.
Contohnya, kita dapat menulis ulang fungsi gen
untuk menyalin semua nilai
integer ke dalam sebuah kanal dengan buffer dan menghindari pembuatan
goroutine yang baru:
func gen(nums ...int) <-chan int { out := make(chan int, len(nums)) for _, n := range nums { out <- n } close(out) return out }
Balik lagi ke goroutine yang pampat dalam pipeline kita, pertimbangkan
juga untuk menambahkan sebuah buffer ke kanal keluar yang dikembalikan
oleh fungsi merge
:
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int, 1) // ruang yang cukup untuk input yang belum dibaca. // ... sisa kode lainnya tidak berubah ...
Walaupun hal ini memperbaiki goroutine yang pampat dalam program kita, kode
ini buruk.
Pilihan untuk ukuran buffer out
yaitu 1, karena kita mengetahui
jumlah nilai yang diterima oleh fungsi merge
dan jumlah nilai yang setiap
tahap hilir akan konsumsi.
Hal ini menyebabkannya rentan dengan kesalahan: jika kita mengirim nilai
tambahan ke fungsi gen
, atau jika tahap hilir membaca nilai yang lebih
sedikit, kita kembali mendapatkan goroutine yang pampat.
Untuk itu, kita membutuhkan suatu cara supaya setiap tahap di hilir mengindikasikan ke pengirim bahwa mereka akan berhenti menerima input.
Pembatalan eksplisit
Saat fungsi main
memutuskan untuk berhenti menerima nilai dari out
,
ia harus memberitahu goroutine pada tahap hulu untuk berhenti mengirim nilai.
Hal ini dapat dilakukan dengan mengirim sebuah nilai pada kanal bernama
done
.
Fungsi main
mengirim dua nilai ke kanal done
, secara ada potensi dua
pengirim yang akan pampat:
func main() { in := gen(2, 3) // Distribusi pekerjaan lewat dua goroutine sq yang membaca `in` yang // sama. c1 := sq(in) c2 := sq(in) // Konsumsi nilai pertama dari keluaran. done := make(chan struct{}, 2) out := merge(done, c1, c2) fmt.Println(<-out) // 4 atau 9 // Beritahu pengirim kita telah selesai menerima. done <- struct{}{} done <- struct{}{} }
Goroutine yang bertugas melakukan pengiriman mengganti operasi pengiriman
mereka dengan sebuah perintah select
yang mengirim sebuah nilai ke out
atau menerima sebuah nilai dari done
.
Tipe nilai dari kanal done
yaitu struct kosong karena nilainya tidak
diperlukan dalam kasus ini: yang diperlukan adalah kejadian menerima yang
mengindikasikan pengiriman ke out
sebaiknya ditinggalkan.
Goroutine output
terus membaca pada kanal masuk-nya, supaya tahap-tahap
di hulu tidak pampat.
(Kita akan bahas nanti bagaimana membuat pengulangan ini segera selesai.)
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Buat goroutine `output` untuk setiap kanal input dalam `cs`. // Fungsi `output` menyalin nilai dari `c` ke `out` sampai `c` ditutup // atau menerima nilai dari `done`, lalu fungsi ini akan memanggil // `wg.Done`. output := func(c <-chan int) { for n := range c { select { case out <- n: case <-done: } } wg.Done() } // ... sisa kode selanjutnya tidak berubah ...
Pendekatan ini memiliki sebuah masalah: setiap penerima di hilir perlu mengetahui jumlah pengirim yang kemungkinan pampat dan mengatur supaya mengirim sinyal kepada pengirim tersebut. Mencatat semua perhitungan tersebut membutuhkan waktu dan bisa saja salah.
Kita memerlukan sebuah cara untuk memberitahu sejumlah goroutine, yang tidak diketahui jumlahnya, untuk berhenti mengirim nilai ke tahap di hilir. Pada Go, kita dapat melakukan hal ini dengan menutup kanal, karena operasi menerima pada kanal yang telah ditutup akan diproses langsung, menghasilkan nilai kosong dari tipe elemen dari kanal.
Hal ini berarti fungsi main
dapat membersihkan semua pengirim yang pampat
cukup dengan menutup kanal done
.
Penutupan kanal ini secara efektif menyiarkan sinyal ke semua pengirim.
Kita mengubah setiap fungsi pipeline untuk menerima done
sebagai
parameter dan mengatur supaya penutupan kanal terjadi lewat perintah defer
,
supaya semua nilai kembalian dari main
akan mengirim sinyal ke tahap-tahap
pada pipeline supaya berhenti.
func main() { // Buat kanal `done` yang digunakan oleh semua pipeline, dan tutup kanal // tersebut saat pipeline selesai, sebagai sinyal untuk semua goroutine // yang kita jalankan. done := make(chan struct{}) defer close(done) in := gen(done, 2, 3) // Distribusi pekerjaan lewat dua goroutine sq yang membaca `in` yang // sama. c1 := sq(done, in) c2 := sq(done, in) // Konsumsi nilai pertama dari hasil `merge`. out := merge(done, c1, c2) fmt.Println(<-out) // 4 atau 9 // Kanal `done` akan ditutup oleh pemanggilan `defer`. }
Setiap tahap pada pipeline sekarang bebas berhenti saat done
ditutup.
Fungsi merge
dapat selesai tanpa menghabiskan kanal masuk, secara ia
mengetahui bahwa pengirim dari hulu, sq
, akan berhenti mengirim saat
done
ditutup.
Fungsi output
memastikan wg.Done
dipanggil saat selesai lewat perintah
defer
:
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Buat sebuah goroutine `output` untuk setiap kanal input dalam `cs`. // Fungsi `output` menyalin nilai dari `c` ke `out` sampai `c` atau `done` // ditutup, kemudian memanggil `wg.Done`. output := func(c <-chan int) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } } // ... sisa kode selanjutnya tidak berubah ...
Dengan cara yang sama, fungsi sq
dapat berhenti saat kanal done
ditutup.
Fungsi sq
memastikan kanal out
ditutup saat keluar lewat perintah defer
:
func sq(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-done: return } } }() return out }
Berikut panduan untuk membuat pipeline:
-
Setiap tahap menutup kanal keluar saat semua operasi pengirim selesai.
-
Setiap tahap menerima nilai dari kanal masuk sampai kanal tersebut ditutup atau pengirim bebas dari pampat.
Pipeline membuka pengiriman yang terhenti baik lewat buffer atau secara eksplisit dengan mengirim sinyal ke pengirim saat penerima bisa meninggalkan kanal.
Mengurai isi direktori
Mari kita lihat pipeline yang lebih realistis.
MD5 adalah algoritma message-digest yang bisa digunakan untuk checksum
berkas.
Utilitas perintah md5sum
mencetak nilai digest dari daftar berkas.
% md5sum *.go d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go ee869afd31f83cbb2d10ee81b2b831dc parallel.go b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
Contoh program yang akan kita buat seperti md5sum
yang menerima sebuah
direktori sebagai argumen dan mencetak nilai digest untuk setiap berkas di
dalam direktori tersebut, diurut berdasarkan nama.
% go run serial.go . d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go ee869afd31f83cbb2d10ee81b2b831dc parallel.go b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
Fungsi main
dari program kita memanggil fungsi MD5All
, yang mengembalikan
sebuah map
yang berisi path dan nilai digest, kemudian mengurut dan
mencetak hasilnya:
func main() { // Hitung MD5 dari semua berkas di dalam direktori, kemudian cetak // hasilnya diurut berdasarkan nama. m, err := MD5All(os.Args[1]) if err != nil { fmt.Println(err) return } var paths []string for path := range m { paths = append(paths, path) } sort.Strings(paths) for _, path := range paths { fmt.Printf("%x %s\n", m[path], path) } }
Fungsi MD5All
adalah fokus dari diskusi kita sekarang.
Dalam berkas
serial.go
,
implementasinya tidak menggunakan konkurensi dan hanya membaca dan melakukan
sum dari setiap berkas saat membaca isi direktori.
// MD5All baca semua berkas dalam direktori dan kembalikan sebuah map yang // berisi path dan hasil MD5 sum dari isi berkas. // Jika pembacaan isi direktori gagal atau ada operasi pembacaan isi berkas // yang gagal, MD5All akan mengembalikan error. func MD5All(root string) (map[string][md5.Size]byte, error) { m := make(map[string][md5.Size]byte) err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } data, err := ioutil.ReadFile(path) if err != nil { return err } m[path] = md5.Sum(data) return nil }) if err != nil { return nil, err } return m, nil }
Pembacaan secara paralel
Dalam
parallel.go
,
kita memecah MD5All
menjadi pipeline dengan dua tahap.
Tahap pertama, sumFiles
, membaca isi direktori, membaca isi berkas dalam
sebuah goroutine, dan mengirim hasilnya ke sebuah kanal dengan nilai dari tipe
result
:
type result struct { path string sum [md5.Size]byte err error }
Fungsi sumFiles
mengembalikan dua buah kanal: satu untuk kembalian dan satu
lagi untuk eror dari membaca isi direktori dengan filepath.Walk
.
Fungsi yang membaca isi direktori memulai sebuah goroutine baru untuk memroses
setiap berkas, kemudian mencek done
.
Jika done
ditutup, maka pembacaan isi direktori selesai segara:
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) { // Untuk setiap berkas, jalankan sebuah goroutine yang menghitung _sum_ // dari berkas dan mengirim hasilnya ke `c`. // Mengirim hasil dari pembacaan direktori ke `errc`. c := make(chan result) errc := make(chan error, 1) go func() { var wg sync.WaitGroup err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } wg.Add(1) go func() { data, err := ioutil.ReadFile(path) select { case c <- result{path, md5.Sum(data), err}: case <-done: } wg.Done() }() // Batalkan pembacaan direktori jika `done` ditutup. select { case <-done: return errors.New("walk canceled") default: return nil } }) // Pembacaan direktori telah selesai, sehingga semua pemanggilan // wg.Add telah dilakukan. // Jalankan sebuah goroutine untuk menutup `c` saat semua pengiriman // telah selesai. go func() { wg.Wait() close(c) }() // Perintah `select` tidak perlu di sini, secara `errc` menggunakan // _buffer_. errc <- err }() return c, errc }
Fungsi MD5All
menerima nilai digest dari c
.
MD5All
segera selesai bila ada eror, menutup done
lewat defer
:
func MD5All(root string) (map[string][md5.Size]byte, error) { // MD5All menutup kanal `done` saat selesai; ia bisa menutupnya sebelum // menerima semua nilai dari `c` dan `errc`. done := make(chan struct{}) defer close(done) c, errc := sumFiles(done, root) m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } if err := <-errc; err != nil { return nil, err } return m, nil }
Membatasi paralelisme
Implementasi MD5All
dalam
parallel.go
menjalankan sebuah goroutine baru untuk setiap berkas.
Dalam sebuah direktori dengan banyak berkas, hal ini bisa mengakibatkan
alokasi memori yang lebih banyak daripada memori pada sistem.
Kita dapat mengurangi alokasi ini dengan membatasi jumlah berkas yang dibaca
secara paralel.
Dalam
bounded.go
,
hal ini dilakukan dengan membuat sejumlah n
goroutine untuk membaca berkas.
Sekarang pipeline kita memiliki tiga tahap: baca isi direktori, baca dan
digest berkas, dan kumpulkan hasil digest.
Tahap pertama, walkFiles
, menghasilkan path dari berkas di dalam direktori:
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { paths := make(chan string) errc := make(chan error, 1) go func() { // Tutup kanal `paths` setelah semua isi direktori dibaca. defer close(paths) // Tidak perlus `select`, secara `errc` memiliki buffer. errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case paths <- path: case <-done: return errors.New("walk canceled") } return nil }) }() return paths, errc }
Tahap kedua, digester
, menjalankan sejumlah goroutine digest yang menerima
nama berkas dari kanal paths
dan mengirim hasilnya ke kanal c
:
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) { for path := range paths { data, err := ioutil.ReadFile(path) select { case c <- result{path, md5.Sum(data), err}: case <-done: return } } }
Tidak seperti contoh sebelumnya, fungsi digester
tidak menutup kanal
keluar, karena beberapa goroutine mengirim ke sebuah kanal yang sama.
Namun, kode dalam MD5All
akan menutup kanal tersebut saat semua digester
selesai:
// Jalankan sejumlah goroutine untuk membaca dan men-_digest_ berkas. c := make(chan result) var wg sync.WaitGroup const numDigesters = 20 wg.Add(numDigesters) for i := 0; i < numDigesters; i++ { go func() { digester(done, paths, c) wg.Done() }() } go func() { wg.Wait() close(c) }()
Kita bisa saja membuat setiap fungsi digester
membuat dan mengembalikan
kanal keluar mereka sendiri, namun hal ini membutuhkan goroutine tambahan
untuk fan-in (menggabungkan) semua hasilnya.
Tahap terakhir menerima semua hasil dari c
kemudian memeriksa eror dari
errc
.
Pemeriksaan ini tidak bisa dilakukan lebih awal, secara walkFiles
bisa saja
menahan pengiriman nilai ke hilir:
m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } // Periksa apakah `Walk` gagal. if err := <-errc; err != nil { return nil, err } return m, nil }
Kesimpulan
Artikel ini telah menjelaskan beberapa teknik untuk membangun aliran data pipeline dengan Go. Berurusan dengan kegagalan pada pipeline sedikit kompleks, secara setiap tahap dalam pipeline bisa menahan mengirim nilai ke hilir, dan tahap selanjutnya bisa saja tidak memerlukan lagi data yang masuk. Kita telah memperlihatkan bagaimana menutup sebuah kanal dapat menyiarkan sebuah sinyal yang menandakan selesai ("done") ke semua goroutine yang dijalankan oleh pipeline dan mendefinisikan aturan-aturan untuk membangun pipeline secara benar.
Bacaan lebih lanjut:
-
Pola konkurensi Go (video) mempresentasikan dasar dari konkurensi Go dan cara pakainya.
-
Pola konkurensi Go lanjut (video) menelaah penggunaan yang lebih kompleks dari konkurensi pada Go, terutama
select
. -
Makalah Douglas McIlroy Squinting at Power Series memperlihatkan bagaimana konkurensi seperti Go menyediakan dukungan yang elegan untuk perhitungan yang kompleks.