La Pluma: A Lightweight Go Data Streaming Library

Tuesday, Jul 22, 2025 | 2 minute read | Updated at Tuesday, Jul 22, 2025

@

GitHub repositorie: lapluma

La Pluma: A Lightweight Go Data Streaming Library

La Pluma is a compact, functional-style Go utility library that provides a simple and composable set of data processing tools. It includes two core components: Iterator for sequential data processing and Pipe, which leverages Go’s channels and context for concurrent data processing.

Core Design

The library’s core is built around simplicity and composability. By offering a set of orthogonal, single-purpose operations (such as Map, Filter, and Reduce), it enables the construction of clear, readable, and powerful data processing pipelines.

Core Concepts

1. Iterator - Sequential Data Streams

Iterator is a standard iterator interface that defines element-by-element access to a data sequence.

Main Operations:

  • FromSlice(data []E) Iterator[E]: Creates an iterator from a slice.
  • FromMap(data map[K]V) Iterator[Pair[K,V]]: Creates an iterator from a map.
  • Map[E, R](it Iterator[E], handler func(E) R) Iterator[R]: Applies a non-error-prone transformation to each element.
  • Filter[E](it Iterator[E], filter func(E) bool) Iterator[E]: Filters out elements that do not meet a condition.
  • Reduce[E, R](it Iterator[E], handler func(R, E) R, initial R) R: Aggregates the sequence into a single value.
  • Collect[E](it Iterator[E]) []E: Gathers all elements from an iterator into a slice.
  • Iter[E any](it Iterator[E]) iter.Seq[E] and Iter2[K, V any](it Iterator[lapluma.Pair[K, V]]) iter.Seq2[K, V]: Creates an iterator that conforms to the iter package, allowing it to be used with a for-range loop.

Example:

// Create an iterator
data := []int{1, 2, 3, 4, 5}
it := iterator.FromSlice(data)

// Chained operations
result := iterator.Collect(
    iterator.Filter(
        iterator.Map(it, func(x int) int { return x * 2 }),
        func(x int) bool { return x > 5 }
    )
) // [6, 8, 10]

// Or use a for-range loop to process the iterator result directly:
it := iterator.Filter(
    iterator.Map(it, func(x int) int { return x * 2 }),
    func(x int) bool { return x > 5 }
)
for data := range Iter(filteredIt) {
    fmt.Printf("%v", data)
}

The Iterator is designed for sequential processing and is not concurrent-safe. If you require concurrency, convert it to a Pipe by calling pipe.FromIterator.

2. Pipe - Concurrent Data Streams

Pipe is the concurrent version of Iterator. It’s built on Go channels, and each operation (e.g., Map, Filter) runs in a separate group of goroutines, forming a processing pipeline.

All Pipe operations are integrated with context.Context for easy timeout control and graceful shutdown.

Main Operations:

  • FromSlice(data []E, ctx context.Context) *Pipe[E]: Creates a concurrent pipe from a slice.
  • FromIterator(it iterator.Iterator[E], ctx context.Context) *Pipe[E]: Creates a concurrent pipe from an iterator.
  • Map, Filter, Reduce, and other functions have the same functionality as their Iterator counterparts, but they execute concurrently.

Example:

ctx := context.Background()

// Create a concurrent pipe
p := pipe.FromSlice([]int{1, 2, 3, 4, 5}, ctx)

// Concurrent processing (3 worker goroutines)
result := pipe.Collect(
    pipe.Filter(
        pipe.Map(p, cpuIntensiveTask, 3), // Concurrency level 3
        func(x int) bool { return x > 10 },
        2, // Concurrency level 2
    )
)

Integration with Standard Iterators

import "iter"

// Convert to a standard iterator
seq := iterator.Iter(myIterator)
for value := range seq {
    // Process the value
}

// Key-value pair iteration
seq2 := iterator.Iter2(mapIterator)
for k, v := range seq2 {
    // Process the key and value
}

Error Handling

La Pluma intentionally simplifies the signatures of core transformation functions, such as Map’s handler being func(T) R instead of func(T) (R, error). This is a design choice to treat errors as part of the data stream.

The following two patterns are recommended for handling operations that might fail:

Pattern 1: Pre-filtering

If some data is invalid from the start or doesn’t meet the processing criteria, use Filter to remove it before it enters the core processing logic.

// Example: Process only positive numbers
pipe := FromSlice([]int{1, -2, 3, -4}, ctx)
positivePipe := Filter(pipe, func(n int) bool {
    return n > 0
})
// ... subsequent operations will only see {1, 3}

Pattern 2: Using TryMap for Fallible Transformations

When the data transformation itself can fail (e.g., parsing a string, calling an external API), use the TryMap function. Its handler signature is func(T) (R, error). When the handler returns a non-nil error, TryMap automatically skips (discards) that element and continues processing the next one. This allows the pipeline to keep running even when encountering “data-level” errors without being interrupted.

import (
    "strconv"
    "errors"
)

// Example: Convert strings to integers, skip on failure
stringPipe := FromSlice([]string{"1", "two", "3", "four"}, ctx)

// Use TryMap; the handler returns (int, error)
intPipe := TryMap(stringPipe, func(s string) (int, error) {
    i, err := strconv.Atoi(s)
    if err != nil {
        // Return an error, and this element will be discarded
        return 0, errors.New("not a number")
    }
    return i, nil
})

// The final Reduce operation will only process the successfully converted {1, 3}
sum := Reduce(intPipe, func(acc, n int) int { return acc + n }, 0)
// The result of sum is 4

Other

If you need to collect errors during the Map process, consider using Result[T] from util.go as the return value.


Installation

go get github.com/muzhy/lapluma

Running Tests

go test ./...

Q&A

Q: Why use a nested approach instead of a chained one?

A: lapluma is built on generics, which reduces runtime overhead and allows for type-safe checks during compilation. However, Go’s generics mechanism does not support type parameters for methods. Therefore, functions like Map cannot be implemented as methods and must use a nested approach. To maintain a consistent interface, all functions are implemented in a nested style.


Future Plans

  • Provide more transformation operations, such as Distinct, Zip, and Peek.
  • Improve the error handling mechanism.
  • Standardize the concurrency control parameters for Pipe.

© 2023 - 2025 muzhy's blog

🌱 Powered by Hugo with theme Dream.

About Me

A programer