rocketmq方法
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

61 lines
1.3 KiB

package rocketmq
import (
"context"
"errors"
"log"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
var MqPool rocketmq.Producer
func MqConnect(group string, resolver []string) {
p, err := rocketmq.NewProducer(
producer.WithGroupName(group),
producer.WithNsResolver(primitive.NewPassthroughResolver(resolver)),
//producer.WithCreateTopicKey("product"),
producer.WithRetry(1))
if err != nil {
log.Println("New "+group+" producer failed", err)
return
}
err = p.Start()
if err != nil {
log.Println(group+" producer start failed", err)
return
}
MqPool = p
}
func SendSync(ctx context.Context, mq *primitive.Message) (*primitive.SendResult, error) {
var res primitive.SendResult
if MqPool == nil {
return &res, errors.New("Mq未链接")
}
return MqPool.SendSync(ctx, mq)
}
func Conn(group string, resolver []string) {
MqConnect(group, resolver)
}
/**
* 发送mq消息
* topic
* tag
* message json格式化
* 2023/02/17
*/
func SendMsg(ctx context.Context, topic, tag string, message []byte) (*primitive.SendResult, error) {
if MqPool == nil {
return nil, errors.New("Mq未链接")
}
msg := primitive.NewMessage(topic, message)
msg.WithTag(tag)
return MqPool.SendSync(ctx, msg)
}