|
|
- package rocketmq
-
- import (
- "context"
- "errors"
- "log"
-
- "github.com/apache/rocketmq-client-go/v2"
- "github.com/apache/rocketmq-client-go/v2/primitive"
- "github.com/apache/rocketmq-client-go/v2/producer"
- )
-
- var MqPool rocketmq.Producer
-
- func MqConnect(group string, resolver []string) {
- p, err := rocketmq.NewProducer(
- producer.WithGroupName(group),
- producer.WithNsResolver(primitive.NewPassthroughResolver(resolver)),
- //producer.WithCreateTopicKey("product"),
- producer.WithRetry(1))
- if err != nil {
- log.Println("New "+group+" producer failed", err)
- return
- }
- err = p.Start()
- if err != nil {
- log.Println(group+" producer start failed", err)
- return
- }
- MqPool = p
- }
-
- func SendSync(ctx context.Context, mq *primitive.Message) (*primitive.SendResult, error) {
- var res primitive.SendResult
- if MqPool == nil {
- return &res, errors.New("Mq未链接")
- }
- return MqPool.SendSync(ctx, mq)
- }
-
- func Conn(group string, resolver []string) {
- MqConnect(group, resolver)
- }
-
- /**
- * 发送mq消息
- * topic
- * tag
- * message json格式化
- * 2023/02/17
- */
- func SendMsg(ctx context.Context, topic, tag string, message []byte) (*primitive.SendResult, error) {
-
- if MqPool == nil {
- return nil, errors.New("Mq未链接")
- }
- msg := primitive.NewMessage(topic, message)
- msg.WithTag(tag)
- return MqPool.SendSync(ctx, msg)
-
- }
|