4 Commits

Author SHA1 Message Date
  guzeng b64b4697ca 修改SendMsg发送 1 year ago
  guzeng 6db5138ede 增加SendMsg方法 1 year ago
  loshiqi 2289559d7a mq发送示例 1 year ago
  loshiqi c72d66d083 增加测试方法 1 year ago
2 changed files with 75 additions and 9 deletions
Split View
  1. +45
    -9
      conn.go
  2. +30
    -0
      conn_test.go

+ 45
- 9
conn.go View File

@ -1,25 +1,61 @@
package rocketmq
import (
"context"
"errors"
"log"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"log"
)
var MqPool *rocketmq.Producer
var MqPool rocketmq.Producer
func MqConnect() {
MqPool, err := rocketmq.NewProducer(
producer.WithGroupName("ProductService"),
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
func MqConnect(group string, resolver []string) {
p, err := rocketmq.NewProducer(
producer.WithGroupName(group),
producer.WithNsResolver(primitive.NewPassthroughResolver(resolver)),
//producer.WithCreateTopicKey("product"),
producer.WithRetry(1))
if err != nil {
log.Println("New ProductService producer failed", err)
log.Println("New "+group+" producer failed", err)
return
}
err = MqPool.Start()
err = p.Start()
if err != nil {
log.Println("ProductService producer start failed", err)
log.Println(group+" producer start failed", err)
return
}
MqPool = p
}
func SendSync(ctx context.Context, mq *primitive.Message) (*primitive.SendResult, error) {
var res primitive.SendResult
if MqPool == nil {
return &res, errors.New("Mq未链接")
}
return MqPool.SendSync(ctx, mq)
}
func Conn(group string, resolver []string) {
MqConnect(group, resolver)
}
/**
* 发送mq消息
* topic
* tag
* message json格式化
* 2023/02/17
*/
func SendMsg(ctx context.Context, topic, tag string, message []byte) (*primitive.SendResult, error) {
if MqPool == nil {
return nil, errors.New("Mq未链接")
}
msg := primitive.NewMessage(topic, message)
msg.WithTag(tag)
return MqPool.SendSync(ctx, msg)
}

+ 30
- 0
conn_test.go View File

@ -0,0 +1,30 @@
package rocketmq
import (
"encoding/json"
"github.com/apache/rocketmq-client-go/v2/primitive"
"log"
"testing"
// "tgo/helper"
"context"
)
func Test_Send(t *testing.T) {
MqConnect("ProductService", []string{"127.0.0.1:9876"})
log.Println("2", MqPool)
mq_data := map[string]interface{}{
"a": "a",
}
topic := "product"
tag := "collect_product"
msg_str, _ := json.Marshal(mq_data)
msg1 := primitive.NewMessage(topic, msg_str)
msg1.WithTag(tag)
res, err := SendSync(context.Background(), msg1)
if err != nil {
log.Printf("send tag sync message error:%s\n", err)
} else {
log.Printf("send tag sync message success. result=%s\n", res.String())
}
}

Loading…
Cancel
Save