package rocketmq import ( "context" "errors" "log" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" ) var MqPool rocketmq.Producer func MqConnect(group string, resolver []string) { p, err := rocketmq.NewProducer( producer.WithGroupName(group), producer.WithNsResolver(primitive.NewPassthroughResolver(resolver)), //producer.WithCreateTopicKey("product"), producer.WithRetry(1)) if err != nil { log.Println("New "+group+" producer failed", err) return } err = p.Start() if err != nil { log.Println(group+" producer start failed", err) return } MqPool = p } func SendSync(ctx context.Context, mq *primitive.Message) (*primitive.SendResult, error) { var res primitive.SendResult if MqPool == nil { return &res, errors.New("Mq未链接") } return MqPool.SendSync(ctx, mq) } func Conn(group string, resolver []string) { MqConnect(group, resolver) } /** * 发送mq消息 * topic * tag * message json格式化 * 2023/02/17 */ func SendMsg(ctx context.Context, topic, tag string, message []byte) (*primitive.SendResult, error) { if MqPool == nil { return nil, errors.New("Mq未链接") } msg := primitive.NewMessage(topic, message) msg.WithTag(tag) return MqPool.SendSync(ctx, msg) }