|
|
@ -1,25 +1,61 @@ |
|
|
|
package rocketmq |
|
|
|
|
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"errors" |
|
|
|
"log" |
|
|
|
|
|
|
|
"github.com/apache/rocketmq-client-go/v2" |
|
|
|
"github.com/apache/rocketmq-client-go/v2/primitive" |
|
|
|
"github.com/apache/rocketmq-client-go/v2/producer" |
|
|
|
"log" |
|
|
|
) |
|
|
|
|
|
|
|
var MqPool *rocketmq.Producer |
|
|
|
var MqPool rocketmq.Producer |
|
|
|
|
|
|
|
func MqConnect() { |
|
|
|
MqPool, err := rocketmq.NewProducer( |
|
|
|
producer.WithGroupName("ProductService"), |
|
|
|
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})), |
|
|
|
func MqConnect(group string, resolver []string) { |
|
|
|
p, err := rocketmq.NewProducer( |
|
|
|
producer.WithGroupName(group), |
|
|
|
producer.WithNsResolver(primitive.NewPassthroughResolver(resolver)), |
|
|
|
//producer.WithCreateTopicKey("product"),
|
|
|
|
producer.WithRetry(1)) |
|
|
|
if err != nil { |
|
|
|
log.Println("New ProductService producer failed", err) |
|
|
|
log.Println("New "+group+" producer failed", err) |
|
|
|
return |
|
|
|
} |
|
|
|
err = MqPool.Start() |
|
|
|
err = p.Start() |
|
|
|
if err != nil { |
|
|
|
log.Println("ProductService producer start failed", err) |
|
|
|
log.Println(group+" producer start failed", err) |
|
|
|
return |
|
|
|
} |
|
|
|
MqPool = p |
|
|
|
} |
|
|
|
|
|
|
|
func SendSync(ctx context.Context, mq *primitive.Message) (*primitive.SendResult, error) { |
|
|
|
var res primitive.SendResult |
|
|
|
if MqPool == nil { |
|
|
|
return &res, errors.New("Mq未链接") |
|
|
|
} |
|
|
|
return MqPool.SendSync(ctx, mq) |
|
|
|
} |
|
|
|
|
|
|
|
func Conn(group string, resolver []string) { |
|
|
|
MqConnect(group, resolver) |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 发送mq消息 |
|
|
|
* topic |
|
|
|
* tag |
|
|
|
* message json格式化 |
|
|
|
* 2023/02/17 |
|
|
|
*/ |
|
|
|
func SendMsg(ctx context.Context, topic, tag string, message []byte) (*primitive.SendResult, error) { |
|
|
|
|
|
|
|
if MqPool == nil { |
|
|
|
return nil, errors.New("Mq未链接") |
|
|
|
} |
|
|
|
msg := primitive.NewMessage(topic, message) |
|
|
|
msg.WithTag(tag) |
|
|
|
return MqPool.SendSync(ctx, msg) |
|
|
|
|
|
|
|
} |