Skip to content

Commit

Permalink
feat: add close function, start transactions when one is not provided
Browse files Browse the repository at this point in the history
Signed-off-by: Sarah Funkhouser <[email protected]>
  • Loading branch information
golanglemonade committed Sep 26, 2024
1 parent 3bc7b3a commit 89017c5
Showing 1 changed file with 93 additions and 8 deletions.
101 changes: 93 additions & 8 deletions pkg/riverqueue/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type JobClient interface {
// this can be used to interact directly with the river client for more advanced use cases (e.g. starting the river server)
// which are outside the scope of the insert-only client interface
GetRiverClient() *river.Client[pgx.Tx]
// Close closes the underlying pgx pool
Close() error
}

// Config settings for the river client
Expand Down Expand Up @@ -121,19 +123,64 @@ func (c *Client) GetPool() *pgxpool.Pool {
return c.pool
}

// Close satisfies the JobClient interface
func (c *Client) Close() error {
c.pool.Close()

return nil
}

// GetRiverClient returns the underlying river client
func (c *Client) GetRiverClient() *river.Client[pgx.Tx] {
return c.riverClient
}

// Insert satisfies the JobClient interface
// Insert satisfies the JobClient interface and inserts a new job with the provided args and opts
// it will start a new transaction and insert the job
func (c *Client) Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error) {
return c.riverClient.Insert(ctx, args, opts)
tx, err := c.pool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return nil, err
}

res, err := c.riverClient.InsertTx(ctx, tx, args, opts)
if err != nil {
if err := tx.Rollback(ctx); err != nil {
return nil, err
}

return nil, err
}

if err := tx.Commit(ctx); err != nil {
return nil, err
}

return res, nil
}

// InsertMany satisfies the JobClient interface
// InsertMany satisfies the JobClient interface and inserts many jobs at once
// it will start a new transaction and insert the jobs
func (c *Client) InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error) {
return c.riverClient.InsertMany(ctx, params)
tx, err := c.pool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return nil, err
}

res, err := c.riverClient.InsertManyTx(ctx, tx, params)
if err != nil {
if err := tx.Rollback(ctx); err != nil {
return nil, err
}

return nil, err
}

if err := tx.Commit(ctx); err != nil {
return nil, err
}

return res, nil
}

// InsertManyTx satisfies the JobClient interface
Expand All @@ -146,19 +193,57 @@ func (c *Client) InsertTx(ctx context.Context, tx pgx.Tx, args river.JobArgs, op
return c.riverClient.InsertTx(ctx, tx, args, opts)
}

// InsertManyFast satisfies the JobClient interface
// InsertManyFast satisfies the JobClient interface and inserts many jobs at once using Postgres' `COPY FROM` mechanism
// it will start a new transaction and insert the jobs and commit the transaction after the insert
func (c *Client) InsertManyFast(ctx context.Context, params []river.InsertManyParams) (int, error) {
return c.riverClient.InsertManyFast(ctx, params)
tx, err := c.pool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return 0, err
}

res, err := c.riverClient.InsertManyFastTx(ctx, tx, params)
if err != nil {
if err := tx.Rollback(ctx); err != nil {
return 0, err
}

return 0, err
}

if err := tx.Commit(ctx); err != nil {
return 0, err
}

return res, nil
}

// InsertManyFastTx satisfies the JobClient interface
func (c *Client) InsertManyFastTx(ctx context.Context, tx pgx.Tx, params []river.InsertManyParams) (int, error) {
return c.riverClient.InsertManyFastTx(ctx, tx, params)
}

// JobCancel satisfies the JobClient interface
// JobCancel satisfies the JobClient interface and cancels the job with the given ID
// it will start a new transaction and cancel the job and commit the transaction
func (c *Client) JobCancel(ctx context.Context, jobID int64) (*rivertype.JobRow, error) {
return c.riverClient.JobCancel(ctx, jobID)
tx, err := c.pool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return nil, err
}

res, err := c.riverClient.JobCancelTx(ctx, tx, jobID)
if err != nil {
if err := tx.Rollback(ctx); err != nil {
return nil, err
}

return nil, err
}

if err := tx.Commit(ctx); err != nil {
return nil, err
}

return res, nil
}

// JobCancelTx satisfies the JobClient interface
Expand Down

0 comments on commit 89017c5

Please sign in to comment.