From beb00ad0786b8990834c913aa3bae149cbc737c7 Mon Sep 17 00:00:00 2001 From: SuperK Date: Wed, 27 May 2026 10:22:35 +0800 Subject: [PATCH] First init --- go.mod | 3 +++ pipeline.go | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) create mode 100644 go.mod create mode 100644 pipeline.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..0358b1a --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.keung.cc/pkg/pipeline + +go 1.25.6 diff --git a/pipeline.go b/pipeline.go new file mode 100644 index 0000000..7814c9c --- /dev/null +++ b/pipeline.go @@ -0,0 +1,36 @@ +package pipeline + +type Next[T any] func(T) error + +type Pipe[T any] func(T, Next[T]) error + +type Pipeline[T any] struct { + passable T + pipes []Pipe[T] +} + +func Send[T any](passable T) *Pipeline[T] { + return (&Pipeline[T]{}).Send(passable) +} + +func (p *Pipeline[T]) Send(passable T) *Pipeline[T] { + p.passable = passable + return p +} + +func (p *Pipeline[T]) Through(pipes ...Pipe[T]) *Pipeline[T] { + p.pipes = pipes + return p +} + +func (p *Pipeline[T]) Then(destination Next[T]) error { + next := destination + for i := len(p.pipes) - 1; i >= 0; i-- { + pipe := p.pipes[i] + currentNext := next + next = func(passable T) error { + return pipe(passable, currentNext) + } + } + return next(p.passable) +}