GitHub repositorie: lapluma
La Pluma: A Lightweight Go Data Streaming Library
La Pluma is a compact, functional-style Go utility library that provides a simple and composable set of data processing tools.
It includes two core components: Iterator for sequential data processing and Pipe, which leverages Go’s channels and context for concurrent data processing.
Core Design
The library’s core is built around simplicity and composability. By offering a set of orthogonal, single-purpose operations (such as Map, Filter, and Reduce), it enables the construction of clear, readable, and powerful data processing pipelines.
Core Concepts
1. Iterator - Sequential Data Streams
Iterator is a standard iterator interface that defines element-by-element access to a data sequence.
Main Operations:
FromSlice(data []E) Iterator[E]: Creates an iterator from a slice.FromMap(data map[K]V) Iterator[Pair[K,V]]: Creates an iterator from a map.Map[E, R](it Iterator[E], handler func(E) R) Iterator[R]: Applies a non-error-prone transformation to each element.Filter[E](it Iterator[E], filter func(E) bool) Iterator[E]: Filters out elements that do not meet a condition.Reduce[E, R](it Iterator[E], handler func(R, E) R, initial R) R: Aggregates the sequence into a single value.Collect[E](it Iterator[E]) []E: Gathers all elements from an iterator into a slice.Iter[E any](it Iterator[E]) iter.Seq[E]andIter2[K, V any](it Iterator[lapluma.Pair[K, V]]) iter.Seq2[K, V]: Creates an iterator that conforms to theiterpackage, allowing it to be used with afor-rangeloop.
Example:
// Create an iterator
data := []int{1, 2, 3, 4, 5}
it := iterator.FromSlice(data)
// Chained operations
result := iterator.Collect(
iterator.Filter(
iterator.Map(it, func(x int) int { return x * 2 }),
func(x int) bool { return x > 5 }
)
) // [6, 8, 10]
// Or use a for-range loop to process the iterator result directly:
it := iterator.Filter(
iterator.Map(it, func(x int) int { return x * 2 }),
func(x int) bool { return x > 5 }
)
for data := range Iter(filteredIt) {
fmt.Printf("%v", data)
}
The Iterator is designed for sequential processing and is not concurrent-safe. If you require concurrency, convert it to a Pipe by calling pipe.FromIterator.
2. Pipe - Concurrent Data Streams
Pipe is the concurrent version of Iterator. It’s built on Go channels, and each operation (e.g., Map, Filter) runs in a separate group of goroutines, forming a processing pipeline.
All Pipe operations are integrated with context.Context for easy timeout control and graceful shutdown.
Main Operations:
FromSlice(data []E, ctx context.Context) *Pipe[E]: Creates a concurrent pipe from a slice.FromIterator(it iterator.Iterator[E], ctx context.Context) *Pipe[E]: Creates a concurrent pipe from an iterator.Map,Filter,Reduce, and other functions have the same functionality as theirIteratorcounterparts, but they execute concurrently.
Example:
ctx := context.Background()
// Create a concurrent pipe
p := pipe.FromSlice([]int{1, 2, 3, 4, 5}, ctx)
// Concurrent processing (3 worker goroutines)
result := pipe.Collect(
pipe.Filter(
pipe.Map(p, cpuIntensiveTask, 3), // Concurrency level 3
func(x int) bool { return x > 10 },
2, // Concurrency level 2
)
)
Integration with Standard Iterators
import "iter"
// Convert to a standard iterator
seq := iterator.Iter(myIterator)
for value := range seq {
// Process the value
}
// Key-value pair iteration
seq2 := iterator.Iter2(mapIterator)
for k, v := range seq2 {
// Process the key and value
}
Error Handling
La Pluma intentionally simplifies the signatures of core transformation functions, such as Map’s handler being func(T) R instead of func(T) (R, error). This is a design choice to treat errors as part of the data stream.
The following two patterns are recommended for handling operations that might fail:
Pattern 1: Pre-filtering
If some data is invalid from the start or doesn’t meet the processing criteria, use Filter to remove it before it enters the core processing logic.
// Example: Process only positive numbers
pipe := FromSlice([]int{1, -2, 3, -4}, ctx)
positivePipe := Filter(pipe, func(n int) bool {
return n > 0
})
// ... subsequent operations will only see {1, 3}
Pattern 2: Using TryMap for Fallible Transformations
When the data transformation itself can fail (e.g., parsing a string, calling an external API), use the TryMap function. Its handler signature is func(T) (R, error). When the handler returns a non-nil error, TryMap automatically skips (discards) that element and continues processing the next one. This allows the pipeline to keep running even when encountering “data-level” errors without being interrupted.
import (
"strconv"
"errors"
)
// Example: Convert strings to integers, skip on failure
stringPipe := FromSlice([]string{"1", "two", "3", "four"}, ctx)
// Use TryMap; the handler returns (int, error)
intPipe := TryMap(stringPipe, func(s string) (int, error) {
i, err := strconv.Atoi(s)
if err != nil {
// Return an error, and this element will be discarded
return 0, errors.New("not a number")
}
return i, nil
})
// The final Reduce operation will only process the successfully converted {1, 3}
sum := Reduce(intPipe, func(acc, n int) int { return acc + n }, 0)
// The result of sum is 4
Other
If you need to collect errors during the Map process, consider using Result[T] from util.go as the return value.
Installation
go get github.com/muzhy/lapluma
Running Tests
go test ./...
Q&A
Q: Why use a nested approach instead of a chained one?
A: lapluma is built on generics, which reduces runtime overhead and allows for type-safe checks during compilation. However, Go’s generics mechanism does not support type parameters for methods. Therefore, functions like Map cannot be implemented as methods and must use a nested approach. To maintain a consistent interface, all functions are implemented in a nested style.
Future Plans
- Provide more transformation operations, such as
Distinct,Zip, andPeek. - Improve the error handling mechanism.
- Standardize the concurrency control parameters for
Pipe.