Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: session sum example #111

Merged
merged 2 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions pkg/reducer/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ func (rtm *reduceTaskManager) CreateTask(ctx context.Context, request *v1.Reduce
// write the output to the output channel, service will forward it to downstream
rtm.responseCh <- task.buildReduceResponse(message)
}
// send EOF
rtm.responseCh <- task.buildEOFResponse()
// close the output channel after the reduce function is done
close(task.outputCh)
// send a done signal
Expand Down Expand Up @@ -133,27 +131,21 @@ func (rtm *reduceTaskManager) OutputChannel() <-chan *v1.ReduceResponse {

// WaitAll waits for all the reduce tasks to complete.
func (rtm *reduceTaskManager) WaitAll() {
tasks := make([]*reduceTask, 0, len(rtm.tasks))
var eofResponse *v1.ReduceResponse
for _, task := range rtm.tasks {
tasks = append(tasks, task)
}

for _, task := range tasks {
<-task.doneCh
if eofResponse == nil {
eofResponse = task.buildEOFResponse()
}
}

rtm.responseCh <- eofResponse
// after all the tasks are completed, close the output channel
close(rtm.responseCh)
}

// CloseAll closes all the reduce tasks.
func (rtm *reduceTaskManager) CloseAll() {
tasks := make([]*reduceTask, 0, len(rtm.tasks))
for _, task := range rtm.tasks {
tasks = append(tasks, task)
}

for _, task := range tasks {
close(task.inputCh)
}
}
Expand Down
18 changes: 6 additions & 12 deletions pkg/reducestreamer/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ func (rtm *reduceStreamTaskManager) CreateTask(ctx context.Context, request *v1.
// write the output to the output channel, service will forward it to downstream
rtm.responseCh <- task.buildReduceResponse(message)
}
// send EOF
rtm.responseCh <- task.buildEOFResponse()
}()

reduceStreamerHandle := rtm.creatorHandle.Create()
Expand Down Expand Up @@ -141,26 +139,22 @@ func (rtm *reduceStreamTaskManager) OutputChannel() <-chan *v1.ReduceResponse {

// WaitAll waits for all the reduceStream tasks to complete.
func (rtm *reduceStreamTaskManager) WaitAll() {
tasks := make([]*reduceStreamTask, 0, len(rtm.tasks))
var eofResponse *v1.ReduceResponse
for _, task := range rtm.tasks {
tasks = append(tasks, task)
}

for _, task := range tasks {
<-task.doneCh
if eofResponse == nil {
eofResponse = task.buildEOFResponse()
}
}
rtm.responseCh <- eofResponse

Comment on lines +142 to +150
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a behaviour change? Before, each task, if not merged, sends its own EOFResponse to responseCh. After, when receiving an EOF, we loop through the active tasks and only send the last EOFResponse.

Copy link
Member

@KeranYang KeranYang Mar 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should include verifying EOF in our unit tests. Currently, we are not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am approving as this is sort of urgent. We can address the unit test comment as a separate issue: #113

// after all the tasks are completed, close the output channel
close(rtm.responseCh)
}

// CloseAll closes all the reduceStream tasks.
func (rtm *reduceStreamTaskManager) CloseAll() {
tasks := make([]*reduceStreamTask, 0, len(rtm.tasks))
for _, task := range rtm.tasks {
tasks = append(tasks, task)
}

for _, task := range tasks {
close(task.inputCh)
}
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/sessionreducer/examples/sum/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
####################################################################################################
# base
####################################################################################################
FROM alpine:3.12.3 as base
RUN apk update && apk upgrade && \
apk add ca-certificates && \
apk --no-cache add tzdata

COPY dist/sum-example /bin/sum-example
RUN chmod +x /bin/sum-example

####################################################################################################
# counter
####################################################################################################
FROM scratch as sum
ARG ARCH
COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=base /bin/sum-example /bin/sum-example
ENTRYPOINT [ "/bin/sum-example" ]
18 changes: 18 additions & 0 deletions pkg/sessionreducer/examples/sum/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
TAG ?= stable
PUSH ?= false

.PHONY: build
build:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/sum-example main.go

.PHONY: image-push
image-push: build
docker buildx build -t "quay.io/numaio/numaflow-go/session-sum:${TAG}" --platform linux/amd64,linux/arm64 --target sum . --push

.PHONY: image
image: build
docker build -t "quay.io/numaio/numaflow-go/session-sum:${TAG}" --target sum .
@if [ "$(PUSH)" = "true" ]; then docker push "quay.io/numaio/numaflow-go/session-sum:${TAG}"; fi

clean:
-rm -rf ./dist
3 changes: 3 additions & 0 deletions pkg/sessionreducer/examples/sum/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Sum

An example User Defined Function that computes sum of events.
21 changes: 21 additions & 0 deletions pkg/sessionreducer/examples/sum/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module counter

go 1.20

replace github.com/numaproj/numaflow-go => ../../../..

require (
github.com/numaproj/numaflow-go v0.6.1-0.20240212202512-715955f1e069
go.uber.org/atomic v1.11.0
)

require (
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)
32 changes: 32 additions & 0 deletions pkg/sessionreducer/examples/sum/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/numaproj/numaflow-go v0.6.1-0.20240212202512-715955f1e069 h1:LBMIrUytoxuZazEnwxo+WPAxjHL1rrVhphTyByLiDYY=
github.com/numaproj/numaflow-go v0.6.1-0.20240212202512-715955f1e069/go.mod h1:WoMt31+h3up202zTRI8c/qe42B8UbvwLe2mJH0MAlhI=
github.com/numaproj/numaflow-go v0.6.1-0.20240221033908-7c1198bc89f1 h1:yY1ZjhfqdxE9H4T9YRff+GJHilv9CJgslWEYfsE/EPo=
github.com/numaproj/numaflow-go v0.6.1-0.20240221033908-7c1198bc89f1/go.mod h1:WoMt31+h3up202zTRI8c/qe42B8UbvwLe2mJH0MAlhI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
59 changes: 59 additions & 0 deletions pkg/sessionreducer/examples/sum/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package main

import (
"context"
"fmt"
"log"
"strconv"

"go.uber.org/atomic"

"github.com/numaproj/numaflow-go/pkg/sessionreducer"
)

// Sum is a simple session reducer which computes sum of events in a session.
type Sum struct {
sum *atomic.Int32
}

func (c *Sum) SessionReduce(ctx context.Context, keys []string, input <-chan sessionreducer.Datum, outputCh chan<- sessionreducer.Message) {
for d := range input {
val, err := strconv.Atoi(string(d.Value()))
if err != nil {
log.Panic("unable to convert the value to int: ", err.Error())
} else {
c.sum.Add(int32(val))
}
}
outputCh <- sessionreducer.NewMessage([]byte(fmt.Sprintf("%d", c.sum.Load()))).WithKeys(keys)
}

func (c *Sum) Accumulator(ctx context.Context) []byte {
return []byte(strconv.Itoa(int(c.sum.Load())))
}

func (c *Sum) MergeAccumulator(ctx context.Context, accumulator []byte) {
val, err := strconv.Atoi(string(accumulator))
if err != nil {
log.Println("unable to convert the accumulator value to int: ", err.Error())
return
}
c.sum.Add(int32(val))
}

func NewSessionCounter() sessionreducer.SessionReducer {
return &Sum{
sum: atomic.NewInt32(0),
}
}

// SessionCounterCreator is the creator for the session reducer.
type SessionCounterCreator struct{}

func (s *SessionCounterCreator) Create() sessionreducer.SessionReducer {
return NewSessionCounter()
}

func main() {
sessionreducer.NewServer(&SessionCounterCreator{}).Start(context.Background())
}
Loading