228 lines
4.3 KiB
Go
228 lines
4.3 KiB
Go
|
package ceen
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"github.com/kjuulh/ceen/codec"
|
||
|
"github.com/nats-io/nats.go"
|
||
|
"strings"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
eventTypeHdr = "ceen-type"
|
||
|
eventCodecHdr = "ceen-codec"
|
||
|
)
|
||
|
|
||
|
type EventStore struct {
|
||
|
c *Ceen
|
||
|
name string
|
||
|
}
|
||
|
|
||
|
func (es *EventStore) Create(conf *nats.StreamConfig) error {
|
||
|
if conf == nil {
|
||
|
conf = &nats.StreamConfig{}
|
||
|
}
|
||
|
conf.Name = es.name
|
||
|
|
||
|
if len(conf.Subjects) == 0 {
|
||
|
conf.Subjects = []string{fmt.Sprintf("%s.>", es.name)}
|
||
|
}
|
||
|
_, err := es.c.js.AddStream(conf)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (es *EventStore) Append(ctx context.Context, subject string, events ...*Event) (uint64, error) {
|
||
|
var ack *nats.PubAck
|
||
|
|
||
|
for _, event := range events {
|
||
|
popts := []nats.PubOpt{
|
||
|
nats.Context(ctx),
|
||
|
nats.ExpectStream(es.name),
|
||
|
}
|
||
|
|
||
|
e, err := es.wrapEvent(event)
|
||
|
if err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
|
||
|
msg, err := es.packEvent(subject, e)
|
||
|
if err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
|
||
|
ack, err = es.c.js.PublishMsg(msg, popts...)
|
||
|
if err != nil {
|
||
|
if strings.Contains(err.Error(), "wrong last sequence") {
|
||
|
return 0, errors.New("wrong last sequence")
|
||
|
}
|
||
|
return 0, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return ack.Sequence, nil
|
||
|
}
|
||
|
|
||
|
func (es *EventStore) wrapEvent(event *Event) (*Event, error) {
|
||
|
if event.Data == nil {
|
||
|
return nil, errors.New("event data is required")
|
||
|
}
|
||
|
|
||
|
if es.c.types == nil {
|
||
|
if event.Type == "" {
|
||
|
return nil, errors.New("event type is required")
|
||
|
}
|
||
|
} else {
|
||
|
t, err := es.c.types.Lookup(event.Data)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
if event.Type == "" {
|
||
|
event.Type = t
|
||
|
} else if event.Type != t {
|
||
|
return nil, fmt.Errorf("wrong type for event data: %s", event.Data)
|
||
|
}
|
||
|
}
|
||
|
if v, ok := event.Data.(validator); ok {
|
||
|
if err := v.Validate(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
if event.ID == "" {
|
||
|
event.ID = es.c.id.New()
|
||
|
}
|
||
|
|
||
|
return event, nil
|
||
|
}
|
||
|
|
||
|
func (es *EventStore) packEvent(subject string, event *Event) (*nats.Msg, error) {
|
||
|
var (
|
||
|
data []byte
|
||
|
err error
|
||
|
codecName string
|
||
|
)
|
||
|
|
||
|
if es.c.types == nil {
|
||
|
data, err = codec.Binary.Marshal(event.Data)
|
||
|
codecName = codec.Binary.Name()
|
||
|
} else {
|
||
|
data, err = es.c.types.Marshal(event.Data)
|
||
|
codecName = es.c.types.Codec().Name()
|
||
|
}
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
msg := nats.NewMsg(subject)
|
||
|
msg.Data = data
|
||
|
|
||
|
msg.Header.Set(nats.MsgIdHdr, event.ID)
|
||
|
msg.Header.Set(eventTypeHdr, event.Type)
|
||
|
msg.Header.Set(eventCodecHdr, codecName)
|
||
|
|
||
|
return msg, nil
|
||
|
}
|
||
|
|
||
|
type loadOpts struct {
|
||
|
afterSeq *uint64
|
||
|
}
|
||
|
|
||
|
type natsApiError struct {
|
||
|
Code int `json:"code"`
|
||
|
ErrCode uint16 `json:"err_code"`
|
||
|
Description string `json:"description"`
|
||
|
}
|
||
|
|
||
|
type natsStoredMsg struct {
|
||
|
Sequence uint64 `json:"seq"`
|
||
|
}
|
||
|
|
||
|
type natsGetMsgRequest struct {
|
||
|
LastBySubject string `json:"last_by_subj"`
|
||
|
}
|
||
|
|
||
|
type natsGetMsgResponse struct {
|
||
|
Type string `json:"type"`
|
||
|
Error *natsApiError `json:"error"`
|
||
|
Message *natsStoredMsg `json:"message"`
|
||
|
}
|
||
|
|
||
|
func (es *EventStore) Load(ctx context.Context, subject string) ([]*Event, uint64, error) {
|
||
|
lastMsg, err := es.lastMsgForSubject(ctx, subject)
|
||
|
if err != nil {
|
||
|
return nil, 0, err
|
||
|
}
|
||
|
|
||
|
if lastMsg.Sequence == 0 {
|
||
|
return nil, 0, nil
|
||
|
}
|
||
|
|
||
|
sopts := []nats.SubOpt{
|
||
|
nats.OrderedConsumer(),
|
||
|
nats.DeliverAll(),
|
||
|
}
|
||
|
|
||
|
sub, err := es.c.js.SubscribeSync(subject, sopts...)
|
||
|
if err != nil {
|
||
|
return nil, 0, err
|
||
|
}
|
||
|
|
||
|
defer sub.Unsubscribe()
|
||
|
|
||
|
var events []*Event
|
||
|
|
||
|
for {
|
||
|
msg, err := sub.NextMsgWithContext(ctx)
|
||
|
if err != nil {
|
||
|
return nil, 0, err
|
||
|
}
|
||
|
|
||
|
event, err := es.c.UnpackEvent(msg)
|
||
|
if err != nil {
|
||
|
return nil, 0, err
|
||
|
}
|
||
|
|
||
|
events = append(events, event)
|
||
|
if event.Sequence == lastMsg.Sequence {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return events, lastMsg.Sequence, nil
|
||
|
}
|
||
|
|
||
|
func (es *EventStore) lastMsgForSubject(ctx context.Context, subject string) (*natsStoredMsg, error) {
|
||
|
rsubject := fmt.Sprintf("$JS.API.STREAM.MSG.GET.%s", es.name)
|
||
|
|
||
|
data, _ := json.Marshal(&natsGetMsgRequest{
|
||
|
LastBySubject: subject,
|
||
|
})
|
||
|
|
||
|
msg, err := es.c.nc.RequestWithContext(ctx, rsubject, data)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
var rep natsGetMsgResponse
|
||
|
err = json.Unmarshal(msg.Data, &rep)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
if rep.Error != nil {
|
||
|
if rep.Error.Code == 404 {
|
||
|
return &natsStoredMsg{}, nil
|
||
|
}
|
||
|
|
||
|
return nil, fmt.Errorf("%s (%d)", rep.Error.Description, rep.Error.Code)
|
||
|
}
|
||
|
|
||
|
return rep.Message, nil
|
||
|
}
|
||
|
|
||
|
func (es *EventStore) Delete() error {
|
||
|
return es.c.js.DeleteStream(es.name)
|
||
|
}
|