3 Commits

Author SHA1 Message Date
  guzeng b64b4697ca 修改SendMsg发送 1 year ago
  guzeng 6db5138ede 增加SendMsg方法 1 year ago
  loshiqi 2289559d7a mq发送示例 1 year ago
2 changed files with 35 additions and 2 deletions
Split View
  1. +34
    -1
      conn.go
  2. +1
    -1
      conn_test.go

+ 34
- 1
conn.go View File

@ -1,10 +1,13 @@
package rocketmq
import (
"context"
"errors"
"log"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"log"
)
var MqPool rocketmq.Producer
@ -26,3 +29,33 @@ func MqConnect(group string, resolver []string) {
}
MqPool = p
}
func SendSync(ctx context.Context, mq *primitive.Message) (*primitive.SendResult, error) {
var res primitive.SendResult
if MqPool == nil {
return &res, errors.New("Mq未链接")
}
return MqPool.SendSync(ctx, mq)
}
func Conn(group string, resolver []string) {
MqConnect(group, resolver)
}
/**
* 发送mq消息
* topic
* tag
* message json格式化
* 2023/02/17
*/
func SendMsg(ctx context.Context, topic, tag string, message []byte) (*primitive.SendResult, error) {
if MqPool == nil {
return nil, errors.New("Mq未链接")
}
msg := primitive.NewMessage(topic, message)
msg.WithTag(tag)
return MqPool.SendSync(ctx, msg)
}

+ 1
- 1
conn_test.go View File

@ -20,7 +20,7 @@ func Test_Send(t *testing.T) {
msg_str, _ := json.Marshal(mq_data)
msg1 := primitive.NewMessage(topic, msg_str)
msg1.WithTag(tag)
res, err := MqPool.SendSync(context.Background(), msg1)
res, err := SendSync(context.Background(), msg1)
if err != nil {
log.Printf("send tag sync message error:%s\n", err)
} else {


Loading…
Cancel
Save