Skip to content

Commit d3dfe2f

Browse files
authored
added nats helper
1 parent f3cd474 commit d3dfe2f

File tree

1 file changed

+51
-0
lines changed

1 file changed

+51
-0
lines changed

nats_helper/nats.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package nats_helper
2+
3+
import (
4+
"github.com/nats-io/nats.go"
5+
"github.com/nats-io/nats.go/jetstream"
6+
"go.uber.org/zap"
7+
)
8+
9+
func NewNatsJetStream(nc *nats.EncodedConn, logger *zap.SugaredLogger) (jetstream.JetStream, error) {
10+
js, err := jetstream.New(nc.Conn)
11+
if err != nil {
12+
logger.Error("failed to create nats jetstream", zap.Error(err))
13+
return nil, err
14+
}
15+
16+
return js, nil
17+
}
18+
19+
20+
func NewNatsConn(natsUrl string, logger *zap.SugaredLogger) (*nats.EncodedConn, error) {
21+
nc, err := nats.Connect(
22+
natsUrl,
23+
nats.RetryOnFailedConnect(true),
24+
//nats.MaxReconnects(100),
25+
nats.PingInterval(time.Second*30),
26+
nats.ReconnectWait(time.Second),
27+
nats.ReconnectHandler(func(conn *nats.Conn) {
28+
logger.Infof("attempting to connect to nats server %s", natsUrl)
29+
}),
30+
nats.DisconnectErrHandler(func(c *nats.Conn, error error) {
31+
logger.Errorf("disconnected from nats %v", error)
32+
return
33+
}),
34+
nats.ClosedHandler(func(c *nats.Conn) {
35+
logger.Errorf("connection closed")
36+
return
37+
}))
38+
39+
if err != nil {
40+
logger.Errorf("failed to connect to nats server %s: %v", natsUrl, err)
41+
return nil, err
42+
}
43+
44+
enc, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
45+
if err != nil {
46+
logger.Errorf("failed to create json encoder for nats client: %v", err)
47+
return nil, err
48+
}
49+
50+
return enc, nil
51+
}

0 commit comments

Comments
 (0)