rocketmq方法
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

61 lines
1.3 KiB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
  1. package rocketmq
  2. import (
  3. "context"
  4. "errors"
  5. "log"
  6. "github.com/apache/rocketmq-client-go/v2"
  7. "github.com/apache/rocketmq-client-go/v2/primitive"
  8. "github.com/apache/rocketmq-client-go/v2/producer"
  9. )
  10. var MqPool rocketmq.Producer
  11. func MqConnect(group string, resolver []string) {
  12. p, err := rocketmq.NewProducer(
  13. producer.WithGroupName(group),
  14. producer.WithNsResolver(primitive.NewPassthroughResolver(resolver)),
  15. //producer.WithCreateTopicKey("product"),
  16. producer.WithRetry(1))
  17. if err != nil {
  18. log.Println("New "+group+" producer failed", err)
  19. return
  20. }
  21. err = p.Start()
  22. if err != nil {
  23. log.Println(group+" producer start failed", err)
  24. return
  25. }
  26. MqPool = p
  27. }
  28. func SendSync(ctx context.Context, mq *primitive.Message) (*primitive.SendResult, error) {
  29. var res primitive.SendResult
  30. if MqPool == nil {
  31. return &res, errors.New("Mq未链接")
  32. }
  33. return MqPool.SendSync(ctx, mq)
  34. }
  35. func Conn(group string, resolver []string) {
  36. MqConnect(group, resolver)
  37. }
  38. /**
  39. * 发送mq消息
  40. * topic
  41. * tag
  42. * message json格式化
  43. * 2023/02/17
  44. */
  45. func SendMsg(ctx context.Context, topic, tag string, message []byte) (*primitive.SendResult, error) {
  46. if MqPool == nil {
  47. return nil, errors.New("Mq未链接")
  48. }
  49. msg := primitive.NewMessage(topic, message)
  50. msg.WithTag(tag)
  51. return MqPool.SendSync(ctx, msg)
  52. }