Pendahuluan

Primitif konkurensi pada Go mempermudah kita membangun aliran data pipeline yang menggunakan I/O dan CPU dengan efisien. Artikel ini menampilkan contoh-contoh dari pipeline tersebut, menyoroti kesalahan yang mungkin muncul saat operasi gagal, dan memperkenalkan teknik-teknik untuk berurusan dengan kegagalan secara bersih.

Apa itu pipeline?

Tidak ada definisi formal dari sebuah pipeline dalam Go; ia adalah salah satu dari banyak jenis program yang konkuren. Secara informal, sebuah pipeline adalah suatu urutan tahap-tahap yang dihubungkan oleh kanal, yang mana setiap tahap adalah sebuah grup dari goroutine yang menjalankan fungsi yang sama. Dalam setiap tahap, setiap goroutine

  • menerima nilai dari hulu lewat kanal masuk

  • memroses data lewat fungsi, biasanya menghasilkan nilai baru

  • mengirim nilai ke hilir lewat kanal keluar

Setiap tahap memiliki sejumlah kanal masuk dan keluar, kecuali tahap yang pertama dan terakhir, yang mana hanya memiliki kanal keluar atau masuk, secara berurutan. Tahap yang pertama biasanya disebut dengan sumber atau produser; tahap yang terakhir biasanya disebut dengan sink atau konsumer.

Kita akan memulai dengan sebuah contoh pipeline sederhana untuk menjelaskan ide-ide dan teknik-teknik tersebut. Nanti, kita akan memperlihatkan contoh yang lebih nyata.

Memangkatkan bilangan

Bayangkan sebuah pipeline dengan tiga tahap.

Tahap pertama, gen, yaitu sebuah fungsi yang mengonversi deretan menjadi menjadi sebuah kanal. Fungsi gen menjalankan sebuah goroutine yang mengirim setiap parameter integer yang ia terima ke dalam sebuah kanal dan menutup kanal tersebut saat semua nilai telah dikirim.

func gen(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

Tahap kedua, sq, menerima sejumlah nilai integer dari sebuah kanal dan mengembalikan sebuah kanal yang berisi pangkat dari setiap integer yang ia terima. Setelah kanal masuk ditutup (yang berarti semua nilai integer telah diterima) dan tahap ini telah mengirim semua nilai pangkat ke hilir, ia akan menutup kanal keluar.

func sq(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()
	return out
}

Fungsi main menyiapkan pipeline dan menjalankan tahap yang terakhir: menerima nilai dari tahap kedua dan mencetaknya satu per satu, sampai kanal ditutup.

func main() {
	// Siapkan pipeline.
	c := gen(2, 3)
	out := sq(c)

	// Konsumsi kanal keluar.
	fmt.Println(<-out) // 4
	fmt.Println(<-out) // 9
}

Secara sq memiliki tipe kanal yang sama untuk yang masuk dan keluar, kita dapat menulisnya beberapa kali. Kita juga dapat menulis fungsi main sebagai pengulangan dengan range, seperti pada tahap-tahap lainnya:

func main() {
	// Siapkan pipeline dan konsumsi kanal keluar.
	for n := range sq(sq(gen(2, 3))) {
		fmt.Println(n) // 16 kemudian 81
	}
}

Fan-out, fan-in

Beberapa fungsi dapat membaca dari kanal yang sama sampai kanal tersebut ditutup; hal ini disebut dengan fan-out. Cara ini membolehkan distribusi kerja antara sekelompok goroutine supaya penggunaan CPU dan I/O paralel.

Sebuah fungsi dapat membaca dari beberapa input sampai semua input ditutup dengan cara menggabungkan semua kanal input menjadi kanal tunggal yang ditutup saat semua input telah ditutup. Cara ini disebut dengan fan-in.

Kita dapat mengubah pipeline sebelumnya untuk menjalankan dua fungsi sq, yang membaca dari kanal input yang sama. Untuk itu, kita perlu membuat sebuah fungsi baru, merge, yang menggabungkan (fan-in) semua hasil dari sq:

func main() {
	in := gen(2, 3)

	// Distribusi pekerjaan lewat dua goroutine `sq` yang membaca `in` yang
	// sama.
	c1 := sq(in)
	c2 := sq(in)

	// Konsumsi gabungan keluaran dari c1 dan c2.
	for n := range merge(c1, c2) {
		fmt.Println(n) // 4 lalu 9, atau 9 lalu 4
	}
}

Fungsi merge mengonversi sejumlah kanal menjadi sebuah kanal tunggal dengan menjalankan sebuah goroutine untuk setiap kanal input dan menyalin nilainya ke sebuah kanal keluar tunggal. Saat semua goroutine telah dimulai, fungsi merge menjalankan lagi sebuah goroutine untuk menutup kanal keluar saat semua pengiriman ke kanal keluar tersebut selesai.

Pengiriman ke kanal yang telah ditutup akan menyebabkan panic, jadi sangat penting untuk memastikan semua pengiriman telah selesai sebelum menutup kanal keluar tersebut. Tipe sync.WaitGroup menyediakan cara sederhana untuk mengatur sinkronisasi seperti ini:

func merge(cs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	// Jalankan goroutine untuk setiap kanal input lewat `cs`.
	// Fungsi `output` menyalin nilai dari `c` ke `out` sampai `c` ditutup,
	// terakhir memanggil `wg.Done`.
	output := func(c <-chan int) {
		for n := range c {
			out <- n
		}
		wg.Done()
	}
	wg.Add(len(cs))
	for _, c := range cs {
		go output(c)
	}

	// Jalankan sebuah goroutine untuk menutup kanal keluar saat semua
	// goroutine `output` telah selesai.
	// Goroutine ini harus dimulai setelah pemanggilan `wg.Add`.
	go func() {
		wg.Wait()
		close(out)
	}()
	return out
}

Berhenti dengan segera

Ada semacam pola dari fungsi-fungsi pipeline kita:

  • Setiap tahap menutup kanal keluar saat semua operasi pengiriman selesai.

  • Setiap tahap terus menerima nilai dari kanal masuk sampai kanal tersebut ditutup.

Pola ini membolehkan setiap tahap yang menerima nilai untuk dibuat sebagai pengulangan range dan memastikan bahwa semua goroutine selesai saat semua nilai telah sukses dikirim ke hilir.

Namun, pada pipeline di dunia nyata, setiap tahap tidak selalu menerima semua nilai yang masuk. Terkadang memang dirancang seperti itu: si penerima hanya memerlukan sebagian dari nilai untuk melanjutkan pemrosesan. Sering kali, sebuah tahap selesai lebih awal karena nilai yang masuk merepresentasikan sebuah eror. Untuk setiap kasus tersebut, si penerima seharusnya tidak menunggu sampai semua nilai diterima, dan kita ingin supaya tahap sebelumnya berhenti mengirim nilai yang tahap berikutnya tidak butuhkan.

Pada contoh pipeline kita sebelumnya, jika tahap terakhir gagal mengonsumsi semua nilai yang masuk, maka goroutine yang mengirim nilai ke tahap terakhir akan pampat, misalnya pada contoh kode berikut:

	// Konsumsi hanya nilai pertama dari `output`.
	out := merge(c1, c2)
	fmt.Println(<-out) // 4 or 9
	return
	// Secara kita tidak mengambil nilai kedua dari `out`, salah satu
	// goroutine `output` akan pampat saat mencoba mengirim ke kanal.
}

Hal ini menyebabkan adanya kebocoran sumber daya: goroutine mengonsumsi sumber daya memori dan runtime, dan referensi heap pada stack goroutine menyebabkan data tidak di-garbage collected. Goroutine tidak di garbage collected; mereka harus selesai dengan sendirinya.

Untuk itu kita perlu mengatur supaya setiap tahap dari hulu pipeline keluar dengan bersih walaupun tahap-tahap di hilir gagal menerima semua nilai yang masuk. Salah satu cara untuk menyelesaikan masalah ini yaitu dengan mengubah kanal keluar supaya memiliki buffer. Sebuah buffer dapat menyimpan sejumlah nilai; operasi pengiriman akan langsung selesai jika ada ruang yang tersedia dalam buffer:

c := make(chan int, 2) // buffer berukuran 2
c <- 1  // langsung sukses.
c <- 2  // langsung sukses.
c <- 3  // pampat sampai goroutine yang lain melakukan <-c dan menerima 1.

Saat jumlah nilai yang akan dikirim diketahui saat kanal dibuat, maka sebuah buffer dapat menyederhanakan kode kita. Contohnya, kita dapat menulis ulang fungsi gen untuk menyalin semua nilai integer ke dalam sebuah kanal dengan buffer dan menghindari pembuatan goroutine yang baru:

func gen(nums ...int) <-chan int {
	out := make(chan int, len(nums))
	for _, n := range nums {
		out <- n
	}
	close(out)
	return out
}

Balik lagi ke goroutine yang pampat dalam pipeline kita, pertimbangkan juga untuk menambahkan sebuah buffer ke kanal keluar yang dikembalikan oleh fungsi merge:

func merge(cs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int, 1) // ruang yang cukup untuk input yang belum dibaca.
	// ... sisa kode lainnya tidak berubah ...

Walaupun hal ini memperbaiki goroutine yang pampat dalam program kita, kode ini buruk. Pilihan untuk ukuran buffer out yaitu 1, karena kita mengetahui jumlah nilai yang diterima oleh fungsi merge dan jumlah nilai yang setiap tahap hilir akan konsumsi. Hal ini menyebabkannya rentan dengan kesalahan: jika kita mengirim nilai tambahan ke fungsi gen, atau jika tahap hilir membaca nilai yang lebih sedikit, kita kembali mendapatkan goroutine yang pampat.

Untuk itu, kita membutuhkan suatu cara supaya setiap tahap di hilir mengindikasikan ke pengirim bahwa mereka akan berhenti menerima input.

Pembatalan eksplisit

Saat fungsi main memutuskan untuk berhenti menerima nilai dari out, ia harus memberitahu goroutine pada tahap hulu untuk berhenti mengirim nilai. Hal ini dapat dilakukan dengan mengirim sebuah nilai pada kanal bernama done. Fungsi main mengirim dua nilai ke kanal done, secara ada potensi dua pengirim yang akan pampat:

func main() {
	in := gen(2, 3)

	// Distribusi pekerjaan lewat dua goroutine sq yang membaca `in` yang
	// sama.
	c1 := sq(in)
	c2 := sq(in)

	// Konsumsi nilai pertama dari keluaran.
	done := make(chan struct{}, 2)
	out := merge(done, c1, c2)
	fmt.Println(<-out) // 4 atau 9

	// Beritahu pengirim kita telah selesai menerima.
	done <- struct{}{}
	done <- struct{}{}
}

Goroutine yang bertugas melakukan pengiriman mengganti operasi pengiriman mereka dengan sebuah perintah select yang mengirim sebuah nilai ke out atau menerima sebuah nilai dari done. Tipe nilai dari kanal done yaitu struct kosong karena nilainya tidak diperlukan dalam kasus ini: yang diperlukan adalah kejadian menerima yang mengindikasikan pengiriman ke out sebaiknya ditinggalkan. Goroutine output terus membaca pada kanal masuk-nya, supaya tahap-tahap di hulu tidak pampat. (Kita akan bahas nanti bagaimana membuat pengulangan ini segera selesai.)

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	// Buat goroutine `output` untuk setiap kanal input dalam `cs`.
	// Fungsi `output` menyalin nilai dari `c` ke `out` sampai `c` ditutup
	// atau menerima nilai dari `done`, lalu fungsi ini akan memanggil
	// `wg.Done`.
	output := func(c <-chan int) {
		for n := range c {
			select {
			case out <- n:
			case <-done:
			}
		}
		wg.Done()
	}
	// ... sisa kode selanjutnya tidak berubah ...

Pendekatan ini memiliki sebuah masalah: setiap penerima di hilir perlu mengetahui jumlah pengirim yang kemungkinan pampat dan mengatur supaya mengirim sinyal kepada pengirim tersebut. Mencatat semua perhitungan tersebut membutuhkan waktu dan bisa saja salah.

Kita memerlukan sebuah cara untuk memberitahu sejumlah goroutine, yang tidak diketahui jumlahnya, untuk berhenti mengirim nilai ke tahap di hilir. Pada Go, kita dapat melakukan hal ini dengan menutup kanal, karena operasi menerima pada kanal yang telah ditutup akan diproses langsung.

Hal ini berarti fungsi main dapat membersihkan semua pengirim yang pampat cukup dengan menutup kanal done. Penutupan kanal ini secara efektif menyiarkan sinyal ke semua pengirim. Kita mengubah setiap fungsi pipeline untuk menerima done sebagai parameter dan mengatur supaya penutupan kanal terjadi lewat perintah defer, supaya semua nilai kembalian dari main akan mengirim sinyal ke tahap-tahap pada pipeline supaya berhenti.

func main() {
	// Buat kanal `done` yang digunakan oleh semua pipeline, dan tutup kanal
	// tersebut saat pipeline selesai, sebagai sinyal untuk semua goroutine
	// yang kita jalankan.
	done := make(chan struct{})
	defer close(done)

	in := gen(done, 2, 3)

	// Distribusi pekerjaan lewat dua goroutine sq yang membaca `in` yang
	// sama.
	c1 := sq(done, in)
	c2 := sq(done, in)

	// Konsumsi nilai pertama dari hasil `merge`.
	out := merge(done, c1, c2)
	fmt.Println(<-out) // 4 atau 9

	// Kanal `done` akan ditutup oleh pemanggilan `defer`.
}

Setiap tahap pada pipeline sekarang bebas berhenti saat done ditutup. Fungsi merge dapat selesai tanpa menghabiskan kanal masuk, secara ia mengetahui bahwa pengirim dari hulu, sq, akan berhenti mengirim saat done ditutup. Fungsi output memastikan wg.Done dipanggil saat selesai lewat perintah defer:

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	// Buat sebuah goroutine `output` untuk setiap kanal input dalam `cs`.
	// Fungsi `output` menyalin nilai dari `c` ke `out` sampai `c` atau `done`
	// ditutup, kemudian memanggil `wg.Done`.
	output := func(c <-chan int) {
		defer wg.Done()
		for n := range c {
			select {
			case out <- n:
			case <-done:
				return
			}
		}
	}
	// ... sisa kode selanjutnya tidak berubah ...

Dengan cara yang sama, fungsi sq dapat berhenti saat kanal done ditutup. Fungsi sq memastikan kanal out ditutup saat keluar lewat perintah defer:

func sq(done <-chan struct{}, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			select {
			case out <- n * n:
			case <-done:
				return
			}
		}
	}()
	return out
}

Berikut panduan untuk membuat pipeline:

  • Setiap tahap menutup kanal keluar saat semua operasi pengirim selesai.

  • Setiap tahap menerima nilai dari kanal masuk sampai kanal tersebut ditutup atau pengirim bebas dari pampat.

Pipeline membuka pengiriman yang terhenti baik lewat buffer atau secara eksplisit dengan mengirim sinyal ke pengirim saat penerima bisa meninggalkan kanal.

Mengurai isi direktori

Mari kita lihat pipeline yang lebih realistis.

MD5 adalah algoritma message-digest yang bisa digunakan untuk checksum berkas. Utilitas perintah md5sum mencetak nilai digest dari daftar berkas.

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

Contoh program yang akan kita buat seperti md5sum yang menerima sebuah direktori sebagai argumen dan mencetak nilai digest untuk setiap berkas di dalam direktori tersebut, diurut berdasarkan nama.

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

Fungsi main dari program kita memanggil fungsi MD5All, yang mengembalikan sebuah map yang berisi path dan nilai digest, kemudian mengurut dan mencetak hasilnya:

func main() {
	// Hitung MD5 dari semua berkas di dalam direktori, kemudian cetak
	// hasilnya diurut berdasarkan nama.
	m, err := MD5All(os.Args[1])
	if err != nil {
		fmt.Println(err)
		return
	}
	var paths []string
	for path := range m {
		paths = append(paths, path)
	}
	sort.Strings(paths)
	for _, path := range paths {
		fmt.Printf("%x  %s\n", m[path], path)
	}
}

Fungsi MD5All adalah fokus dari diskusi kita sekarang. Dalam berkas serial.go, implementasinya tidak menggunakan konkurensi dan hanya membaca dan melakukan sum dari setiap berkas saat membaca isi direktori.

// MD5All baca semua berkas dalam direktori dan kembalikan sebuah map yang
// berisi path dan hasil MD5 sum dari isi berkas.
// Jika pembacaan isi direktori gagal atau ada operasi pembacaan isi berkas
// yang gagal, MD5All akan mengembalikan error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
	m := make(map[string][md5.Size]byte)
	err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
		if err != nil {
			return err
		}
		if !info.Mode().IsRegular() {
			return nil
		}
		data, err := ioutil.ReadFile(path)
		if err != nil {
			return err
		}
		m[path] = md5.Sum(data)
		return nil
	})
	if err != nil {
		return nil, err
	}
	return m, nil
}

Pembacaan secara paralel

Dalam parallel.go, kita memecah MD5All menjadi pipeline dengan dua tahap. Tahap pertama, sumFiles, membaca isi direktori, membaca isi berkas dalam sebuah goroutine, dan mengirim hasilnya ke sebuah kanal dengan nilai dari tipe result:

type result struct {
	path string
	sum  [md5.Size]byte
	err  error
}

Fungsi sumFiles mengembalikan dua buah kanal: satu untuk kembalian dan satu lagi untuk eror dari membaca isi direktori dengan filepath.Walk. Fungsi yang membaca isi direktori memulai sebuah goroutine baru untuk memroses setiap berkas, kemudian mencek done. Jika done ditutup, maka pembacaan isi direktori selesai segara:

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
	// Untuk setiap berkas, jalankan sebuah goroutine yang menghitung _sum_
	// dari berkas dan mengirim hasilnya ke `c`.
	// Mengirim hasil dari pembacaan direktori ke `errc`.
	c := make(chan result)
	errc := make(chan error, 1)
	go func() {
		var wg sync.WaitGroup
		err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			wg.Add(1)
			go func() {
				data, err := ioutil.ReadFile(path)
				select {
				case c <- result{path, md5.Sum(data), err}:
				case <-done:
				}
				wg.Done()
			}()
			// Batalkan pembacaan direktori jika `done` ditutup.
			select {
			case <-done:
				return errors.New("walk canceled")
			default:
				return nil
			}
		})
		// Pembacaan direktori telah selesai, sehingga semua pemanggilan
		// wg.Add telah dilakukan.
		// Jalankan sebuah goroutine untuk menutup `c` saat semua pengiriman
		// telah selesai.
		go func() {
			wg.Wait()
			close(c)
		}()
		// Perintah `select` tidak perlu di sini, secara `errc` menggunakan
		// _buffer_.
		errc <- err
	}()
	return c, errc
}

Fungsi MD5All menerima nilai digest dari c. MD5All segera selesai bila ada eror, menutup done lewat defer:

func MD5All(root string) (map[string][md5.Size]byte, error) {
	// MD5All menutup kanal `done` saat selesai; ia bisa menutupnya sebelum
	// menerima semua nilai dari `c` dan `errc`.
	done := make(chan struct{})
	defer close(done)

	c, errc := sumFiles(done, root)

	m := make(map[string][md5.Size]byte)
	for r := range c {
		if r.err != nil {
			return nil, r.err
		}
		m[r.path] = r.sum
	}
	if err := <-errc; err != nil {
		return nil, err
	}
	return m, nil
}

Membatasi paralelisme

Implementasi MD5All dalam parallel.go menjalankan sebuah goroutine baru untuk setiap berkas. Dalam sebuah direktori dengan banyak berkas, hal ini bisa mengakibatkan alokasi memori yang lebih banyak daripada memori pada sistem.

Kita dapat mengurangi alokasi ini dengan membatasi jumlah berkas yang dibaca secara paralel. Dalam bounded.go, hal ini dilakukan dengan membuat sejumlah n goroutine untuk membaca berkas. Sekarang pipeline kita memiliki tiga tahap: baca isi direktori, baca dan digest berkas, dan kumpulkan hasil digest.

Tahap pertama, walkFiles, menghasilkan path dari berkas di dalam direktori:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
	paths := make(chan string)
	errc := make(chan error, 1)
	go func() {
		// Tutup kanal `paths` setelah semua isi direktori dibaca.
		defer close(paths)
		// Tidak perlus `select`, secara `errc` memiliki buffer.
		errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			select {
			case paths <- path:
			case <-done:
				return errors.New("walk canceled")
			}
			return nil
		})
	}()
	return paths, errc
}

Tahap kedua, digester, menjalankan sejumlah goroutine digest yang menerima nama berkas dari kanal paths dan mengirim hasilnya ke kanal c:

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
	for path := range paths {
		data, err := ioutil.ReadFile(path)
		select {
		case c <- result{path, md5.Sum(data), err}:
		case <-done:
			return
		}
	}
}

Tidak seperti contoh sebelumnya, fungsi digester tidak menutup kanal keluar, karena beberapa goroutine mengirim ke sebuah kanal yang sama. Namun, kode dalam MD5All akan menutup kanal tersebut saat semua digester selesai:

	// Jalankan sejumlah goroutine untuk membaca dan men-_digest_ berkas.
	c := make(chan result)
	var wg sync.WaitGroup
	const numDigesters = 20
	wg.Add(numDigesters)
	for i := 0; i < numDigesters; i++ {
		go func() {
			digester(done, paths, c)
			wg.Done()
		}()
	}
	go func() {
		wg.Wait()
		close(c)
	}()

Kita bisa saja membuat setiap fungsi digester membuat dan mengembalikan kanal keluar mereka sendiri, namun hal ini membutuhkan goroutine tambahan untuk fan-in (menggabungkan) semua hasilnya.

Tahap terakhir menerima semua hasil dari c kemudian memeriksa eror dari errc. Pemeriksaan ini tidak bisa dilakukan lebih awal, secara walkFiles bisa saja menahan pengiriman nilai ke hilir:

	m := make(map[string][md5.Size]byte)
	for r := range c {
		if r.err != nil {
			return nil, r.err
		}
		m[r.path] = r.sum
	}
	// Periksa apakah `Walk` gagal.
	if err := <-errc; err != nil {
		return nil, err
	}
	return m, nil
}

Kesimpulan

Artikel ini telah menjelaskan beberapa teknik untuk membangun aliran data pipeline dengan Go. Berurusan dengan kegagalan pada pipeline sedikit kompleks, secara setiap tahap dalam pipeline bisa menahan mengirim nilai ke hilir, dan tahap selanjutnya bisa saja tidak memerlukan lagi data yang masuk. Kita telah memperlihatkan bagaimana menutup sebuah kanal dapat menyiarkan sebuah sinyal yang menandakan selesai ("done") ke semua goroutine yang dijalankan oleh pipeline dan mendefinisikan aturan-aturan untuk membangun pipeline secara benar.

Bacaan lebih lanjut: