rabbitmq操作
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

105 lines
2.2 KiB

3 years ago
  1. package rabbitmq
  2. import (
  3. "github.com/streadway/amqp"
  4. )
  5. /**
  6. * 发送消息
  7. * @param exchange 交换机名
  8. * @param extype 消息类型
  9. * @param route 路由key
  10. * @param body 内容
  11. */
  12. func Send(extype string, exchange string, route string, body []byte, extend ...bool) error {
  13. var durable, auto_delete, no_wait, internal bool = true, false, false, false
  14. if len(extend) > 0 {
  15. durable = extend[0]
  16. }
  17. if len(extend) > 1 {
  18. auto_delete = extend[1]
  19. }
  20. if len(extend) > 2 {
  21. no_wait = extend[2]
  22. }
  23. if len(extend) > 3 {
  24. internal = extend[3]
  25. }
  26. var err error
  27. err = Chan.ExchangeDeclare(
  28. exchange, // name
  29. extype, // type
  30. durable, // durable
  31. auto_delete, // auto-deleted
  32. internal, // internal
  33. no_wait, // no-wait
  34. nil, // arguments
  35. )
  36. if err != nil {
  37. return err
  38. }
  39. err = Chan.Publish(
  40. exchange, // exchange
  41. route, // routing key
  42. false, // mandatory
  43. false, // immediate
  44. amqp.Publishing{
  45. ContentType: "text/plain",
  46. Body: body,
  47. })
  48. return err
  49. }
  50. /**
  51. * 发送消息
  52. * @param exchange 交换机名
  53. * @param extype 消息类型
  54. * @param route 路由key
  55. * @param body 内容
  56. * @param delay 延迟多少毫秒
  57. */
  58. func SendDelay(extype string, exchange string, route string, body []byte, delay int64, extend ...bool) error {
  59. var durable, auto_delete, no_wait, internal bool = true, false, false, false
  60. if len(extend) > 0 {
  61. durable = extend[0]
  62. }
  63. if len(extend) > 1 {
  64. auto_delete = extend[1]
  65. }
  66. if len(extend) > 2 {
  67. no_wait = extend[2]
  68. }
  69. if len(extend) > 3 {
  70. internal = extend[3]
  71. }
  72. var err error
  73. err = Chan.ExchangeDeclare(
  74. exchange, // name
  75. "x-delayed-message", // type
  76. durable, // durable
  77. auto_delete, // auto-deleted
  78. internal, // internal
  79. no_wait, // no-wait
  80. amqp.Table{"x-delayed-type": extype}, // arguments
  81. )
  82. if err != nil {
  83. return err
  84. }
  85. err = Chan.Publish(
  86. exchange, // exchange
  87. route, // routing key
  88. false, // mandatory
  89. false, // immediate
  90. amqp.Publishing{
  91. Headers: amqp.Table{"x-delay": delay},
  92. ContentType: "text/plain",
  93. Body: body,
  94. })
  95. return err
  96. }