From c72d66d08327599b3974eb90db03a3f502ace546 Mon Sep 17 00:00:00 2001 From: loshiqi <553578653@qq.com> Date: Thu, 3 Nov 2022 14:58:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B5=8B=E8=AF=95=E6=96=B9?= =?UTF-8?q?=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conn.go | 19 +++++++++++-------- conn_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 8 deletions(-) create mode 100644 conn_test.go diff --git a/conn.go b/conn.go index d0deb3c..46d172f 100644 --- a/conn.go +++ b/conn.go @@ -7,19 +7,22 @@ import ( "log" ) -var MqPool *rocketmq.Producer +var MqPool rocketmq.Producer -func MqConnect() { - MqPool, err := rocketmq.NewProducer( - producer.WithGroupName("ProductService"), - producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})), +func MqConnect(group string, resolver []string) { + p, err := rocketmq.NewProducer( + producer.WithGroupName(group), + producer.WithNsResolver(primitive.NewPassthroughResolver(resolver)), //producer.WithCreateTopicKey("product"), producer.WithRetry(1)) if err != nil { - log.Println("New ProductService producer failed", err) + log.Println("New "+group+" producer failed", err) + return } - err = MqPool.Start() + err = p.Start() if err != nil { - log.Println("ProductService producer start failed", err) + log.Println(group+" producer start failed", err) + return } + MqPool = p } diff --git a/conn_test.go b/conn_test.go new file mode 100644 index 0000000..921f6ca --- /dev/null +++ b/conn_test.go @@ -0,0 +1,30 @@ +package rocketmq + +import ( + "encoding/json" + "github.com/apache/rocketmq-client-go/v2/primitive" + "log" + "testing" + // "tgo/helper" + "context" +) + +func Test_Send(t *testing.T) { + MqConnect("ProductService", []string{"127.0.0.1:9876"}) + log.Println("2", MqPool) + mq_data := map[string]interface{}{ + "a": "a", + } + topic := "product" + tag := "collect_product" + msg_str, _ := json.Marshal(mq_data) + msg1 := primitive.NewMessage(topic, msg_str) + msg1.WithTag(tag) + res, err := MqPool.SendSync(context.Background(), msg1) + if err != nil { + log.Printf("send tag sync message error:%s\n", err) + } else { + log.Printf("send tag sync message success. result=%s\n", res.String()) + } + +}