Browse Source

init

master v0.1.0
guzeng 3 years ago
parent
commit
5acd6abca0
5 changed files with 380 additions and 0 deletions
  1. +62
    -0
      conn.go
  2. +55
    -0
      message.go
  3. +105
    -0
      send.go
  4. +19
    -0
      send_test.go
  5. +139
    -0
      variable.go

+ 62
- 0
conn.go View File

@ -0,0 +1,62 @@
package rabbitmq
import (
"errors"
"log"
"github.com/streadway/amqp"
)
var Conn *amqp.Connection
var Chan *amqp.Channel
func Connect(host, port, username, pwd, vhost string) error {
log.Println("RabbitMQ connecting...")
var err error
if username != "" && pwd != "" && host != "" && port != "" {
if vhost == "" {
vhost = "/"
}
Conn, err = amqp.Dial("amqp://" + username + ":" + pwd + "@" + host + ":" + port + "/" + vhost)
if err != nil {
log.Println("ERROR", "Failed to connect to RabbitMQ", err)
} else {
log.Println("RabbitMQ has connected")
}
// defer conn.Close()
} else {
log.Println("ERROR", "RabbitMQ connection params errors")
err = errors.New("RabbitMQ connection params errors")
}
return err
}
func Channel() error {
var err error
Chan, err = Conn.Channel()
if err != nil {
log.Println("ERROR", "Failed to open a channel")
} else {
log.Println("open a channel")
}
// defer ch.Close()
return err
}
func CloseChannel() error {
err := Chan.Close()
if err == nil {
log.Println("RabbitMQ channel closed")
}
return err
}
func CloseConn() error {
err := Conn.Close()
if err == nil {
log.Println("RabbitMQ connection closed")
}
return err
}

+ 55
- 0
message.go View File

@ -0,0 +1,55 @@
package rabbitmq
import (
"encoding/json"
)
/**
* 发送一条api通知消息
* @param exchange 交换机名
* @param extype 消息类型
* @param route 路由key
* @param body 内容
*/
func SendNotice(site_id, dbname, msg_id string) error {
msgData := map[string]interface{}{
"site_id": site_id,
"dbname": dbname,
"data": map[string]string{
"msg_id": msg_id,
},
}
msgDataJson, err := json.Marshal(msgData)
if err != nil {
return err
}
return Send("direct", SEND_MSG_EXCHANGE, SEND_API_MSG_KEY, msgDataJson)
}
/**
* 发布订单已支付消息
* @param exchange 交换机名
* @param extype 消息类型
* @param route 路由key
* @param body 内容
*/
func SendOrderPayed(site_id, dbname, order_id, order_sn, price string) error {
//发布订单已支付消息
orderData := map[string]interface{}{
"site_id": site_id,
"dbname": dbname,
"data": map[string]string{
"order_id": order_id,
"order_sn": order_sn,
"price": price,
},
}
orderDataJson, _ := json.Marshal(orderData)
return Send("direct", ORDER_EXCHANGE, ORDER_PAYED_KEY, orderDataJson)
}

+ 105
- 0
send.go View File

@ -0,0 +1,105 @@
package rabbitmq
import (
"github.com/streadway/amqp"
)
/**
* 发送消息
* @param exchange 交换机名
* @param extype 消息类型
* @param route 路由key
* @param body 内容
*/
func Send(extype string, exchange string, route string, body []byte, extend ...bool) error {
var durable, auto_delete, no_wait, internal bool = true, false, false, false
if len(extend) > 0 {
durable = extend[0]
}
if len(extend) > 1 {
auto_delete = extend[1]
}
if len(extend) > 2 {
no_wait = extend[2]
}
if len(extend) > 3 {
internal = extend[3]
}
var err error
err = Chan.ExchangeDeclare(
exchange, // name
extype, // type
durable, // durable
auto_delete, // auto-deleted
internal, // internal
no_wait, // no-wait
nil, // arguments
)
if err != nil {
return err
}
err = Chan.Publish(
exchange, // exchange
route, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: body,
})
return err
}
/**
* 发送消息
* @param exchange 交换机名
* @param extype 消息类型
* @param route 路由key
* @param body 内容
* @param delay 延迟多少毫秒
*/
func SendDelay(extype string, exchange string, route string, body []byte, delay int64, extend ...bool) error {
var durable, auto_delete, no_wait, internal bool = true, false, false, false
if len(extend) > 0 {
durable = extend[0]
}
if len(extend) > 1 {
auto_delete = extend[1]
}
if len(extend) > 2 {
no_wait = extend[2]
}
if len(extend) > 3 {
internal = extend[3]
}
var err error
err = Chan.ExchangeDeclare(
exchange, // name
"x-delayed-message", // type
durable, // durable
auto_delete, // auto-deleted
internal, // internal
no_wait, // no-wait
amqp.Table{"x-delayed-type": extype}, // arguments
)
if err != nil {
return err
}
err = Chan.Publish(
exchange, // exchange
route, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{"x-delay": delay},
ContentType: "text/plain",
Body: body,
})
return err
}

+ 19
- 0
send_test.go View File

@ -0,0 +1,19 @@
package rabbitmq
import (
"testing"
)
func Test_Send(t *testing.T) {
err := Connect("127.0.0.1", "5672", "tetele", "fly123456@", "ttl")
t.Log(err)
err = Channel()
t.Log(err)
body := []byte("order is expire")
// err = Send("direct", "ex_order_delay", "order_expire", body)
err = SendDelay("direct", "ex_order_delay", "order_expire", body, 5000)
t.Log(err)
}

+ 139
- 0
variable.go View File

@ -0,0 +1,139 @@
package rabbitmq
// --订单--
//订单路由
const ORDER_EXCHANGE string = "ex_order"
//已创建订单route key
const ORDER_CREATED_KEY string = "created"
//订单已创建队列,订单服务用
const ORDER_CREATED_QUEUE = "order_created_queue"
//已创建订单队列,串货服务用
const CHANNEL_ORDER_CREATED_QUEUE = "channel_order_created_queue"
//已支付订单route key
const ORDER_PAYED_KEY string = "payed"
//订单已支付队列,订单服务用
const ORDER_PAYED_QUEUE = "order_payed_queue"
//订单已支付队列,串货服务用
const CHANNEL_ORDER_PAYED_QUEUE = "channel_order_payed_queue"
//订单状态更新route key
const ORDER_STATUS_CHANGE_KEY string = "status_change"
//订单状态更新队列,订单服务用
const ORDER_STATUS_UPDATE_QUEUE = "order_status_update_queue"
//订单状态更新,串货服务用
const CHANNEL_ORDER_STATUS_CHANGE_QUEUE = "channel_order_update_status"
//订单取消route key
const ORDER_CANCELED_KEY string = "canceled"
//订单取消队列,订单服务用
const ORDER_CANCELED_QUEUE = "order_canceled_queue"
//订单取消队列,串货服务用
const CHANNEL_ORDER_CANCELED_QUEUE = "channel_order_canceled_queue"
//订单发货route key
const ORDER_DELIVERED_KEY string = "delivered"
//订单发货队列,订单服务用
const ORDER_DELIVERED_QUEUE = "order_delivered_queue"
//订单发货队列,串货服务用
const CHANNEL_ORDER_DELIVERED_QUEUE = "channel_order_delivered_queue"
//订单收货route key
const ORDER_RECEIVED_KEY string = "received"
//订单取消队列,订单服务用
const ORDER_RECEIVED_QUEUE = "order_received_queue"
//订单取消队列,串货服务用
const CHANNEL_ORDER_RECEIVED_QUEUE = "channel_order_received_queue"
//订单退款route key
const ORDER_REFUNDED_KEY string = "refunded"
//订单退款队列,订单服务用
const ORDER_REFUNDED_QUEUE = "order_refunded_queue"
//订单退款队列,串货服务用
const CHANNEL_ORDER_REFUNDED_QUEUE = "channel_order_refunded_queue"
//订单完成route key
const ORDER_FINISHED_KEY string = "finished"
//订单退款队列,订单服务用
const ORDER_FINISHED_QUEUE = "order_finished_queue"
//订单退款队列,串货服务用
const CHANNEL_ORDER_FINISHED_QUEUE = "channel_order_finished_queue"
//订单等待创建key,订单服务用
const ORDER_ADD_KEY = "add"
//订单等待创建队列,订单服务用
const ORDER_ADD_QUEUE = "order_add_queue"
//订单延迟队列路由
const ORDER_DELAY_EXCHANGE string = "ex_order_delay"
//未支付订单过期自动取消key
const ORDER_AUTO_CANCEL_KEY string = "order_auto_cancel"
//未支付订单过期自动取消队列
const ORDER_AUTO_CANCEL_QUEUE string = "order_auto_cancel_queue"
//未支付订单过期自动取消队列,串货服务用
const CHANNEL_ORDER_AUTO_CANCEL_QUEUE string = "channel_order_auto_cancel_queue"
//订单发货后到期自动收货key
const ORDER_AUTO_RECEIVE_KEY string = "order_auto_received"
//订单发货后到期自动收货队列
const ORDER_AUTO_RECEIVE_QUEUE string = "order_auto_receive_queue"
//订单自动收货队列,串货服务用
const CHANNEL_ORDER_AUTO_RECEIVE_QUEUE string = "channel_order_auto_receive_queue"
//订单申请退款消息key
const ORDER_ASK_FOR_REFUND_KEY string = "order_ask_for_refund"
//订单申请退款消息队列
const ORDER_ASK_FOR_REFUND_QUEUE string = "order_ask_for_refund_queue"
//订单申请退款队列,串货服务用
const CHANNEL_ORDER_ASK_FOR_REFUND_QUEUE string = "channel_order_ask_for_refund_queue"
// --串货--
//串货订单路由
const CHANNEL_ORDER_EXCHANGE string = "ex_channel_order"
//串货订单请求支付route key
const CHANNEL_ORDER_ASKPAY_KEY = "channel_order_askpay"
//订单请求支付队列,串货服务用
const CHANNEL_ORDER_ASKPAY_QUEUE = "channel_order_askpay_queue"
//订单请求支付route key
const ORDER_ASKPAY_KEY = "order_askpay"
//订单请求支付队列,串货服务用
const ORDER_ASKPAY_QUEUE = "order_askpay_queue"
// --通知--
//发送接口请求路由
const SEND_MSG_EXCHANGE string = "ex_send_msg"
//发送接口请求路由route key
const SEND_API_MSG_KEY = "api_send_msg"
//发送接口请求队列,定时服务用
const SEND_MSG_QUEUE = "send_msg_queue"

Loading…
Cancel
Save