diff --git a/conn.go b/conn.go new file mode 100644 index 0000000..cc54ad5 --- /dev/null +++ b/conn.go @@ -0,0 +1,62 @@ +package rabbitmq + +import ( + "errors" + "log" + + "github.com/streadway/amqp" +) + +var Conn *amqp.Connection +var Chan *amqp.Channel + +func Connect(host, port, username, pwd, vhost string) error { + + log.Println("RabbitMQ connecting...") + var err error + if username != "" && pwd != "" && host != "" && port != "" { + if vhost == "" { + vhost = "/" + } + Conn, err = amqp.Dial("amqp://" + username + ":" + pwd + "@" + host + ":" + port + "/" + vhost) + if err != nil { + log.Println("ERROR", "Failed to connect to RabbitMQ", err) + } else { + log.Println("RabbitMQ has connected") + } + // defer conn.Close() + } else { + log.Println("ERROR", "RabbitMQ connection params errors") + err = errors.New("RabbitMQ connection params errors") + } + + return err +} + +func Channel() error { + var err error + Chan, err = Conn.Channel() + if err != nil { + log.Println("ERROR", "Failed to open a channel") + } else { + log.Println("open a channel") + } + // defer ch.Close() + return err +} + +func CloseChannel() error { + err := Chan.Close() + if err == nil { + log.Println("RabbitMQ channel closed") + } + return err +} + +func CloseConn() error { + err := Conn.Close() + if err == nil { + log.Println("RabbitMQ connection closed") + } + return err +} diff --git a/message.go b/message.go new file mode 100644 index 0000000..27f5136 --- /dev/null +++ b/message.go @@ -0,0 +1,55 @@ +package rabbitmq + +import ( + "encoding/json" +) + +/** + * 发送一条api通知消息 + * @param exchange 交换机名 + * @param extype 消息类型 + * @param route 路由key + * @param body 内容 + */ +func SendNotice(site_id, dbname, msg_id string) error { + msgData := map[string]interface{}{ + "site_id": site_id, + "dbname": dbname, + "data": map[string]string{ + "msg_id": msg_id, + }, + } + + msgDataJson, err := json.Marshal(msgData) + if err != nil { + return err + } + + return Send("direct", SEND_MSG_EXCHANGE, SEND_API_MSG_KEY, msgDataJson) + +} + +/** + * 发布订单已支付消息 + * @param exchange 交换机名 + * @param extype 消息类型 + * @param route 路由key + * @param body 内容 + */ +func SendOrderPayed(site_id, dbname, order_id, order_sn, price string) error { + + //发布订单已支付消息 + orderData := map[string]interface{}{ + "site_id": site_id, + "dbname": dbname, + "data": map[string]string{ + "order_id": order_id, + "order_sn": order_sn, + "price": price, + }, + } + + orderDataJson, _ := json.Marshal(orderData) + + return Send("direct", ORDER_EXCHANGE, ORDER_PAYED_KEY, orderDataJson) +} diff --git a/send.go b/send.go new file mode 100644 index 0000000..163fe7d --- /dev/null +++ b/send.go @@ -0,0 +1,105 @@ +package rabbitmq + +import ( + "github.com/streadway/amqp" +) + +/** + * 发送消息 + * @param exchange 交换机名 + * @param extype 消息类型 + * @param route 路由key + * @param body 内容 + */ +func Send(extype string, exchange string, route string, body []byte, extend ...bool) error { + + var durable, auto_delete, no_wait, internal bool = true, false, false, false + + if len(extend) > 0 { + durable = extend[0] + } + if len(extend) > 1 { + auto_delete = extend[1] + } + if len(extend) > 2 { + no_wait = extend[2] + } + if len(extend) > 3 { + internal = extend[3] + } + var err error + err = Chan.ExchangeDeclare( + exchange, // name + extype, // type + durable, // durable + auto_delete, // auto-deleted + internal, // internal + no_wait, // no-wait + nil, // arguments + ) + if err != nil { + return err + } + + err = Chan.Publish( + exchange, // exchange + route, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "text/plain", + Body: body, + }) + return err +} + +/** + * 发送消息 + * @param exchange 交换机名 + * @param extype 消息类型 + * @param route 路由key + * @param body 内容 + * @param delay 延迟多少毫秒 + */ +func SendDelay(extype string, exchange string, route string, body []byte, delay int64, extend ...bool) error { + + var durable, auto_delete, no_wait, internal bool = true, false, false, false + + if len(extend) > 0 { + durable = extend[0] + } + if len(extend) > 1 { + auto_delete = extend[1] + } + if len(extend) > 2 { + no_wait = extend[2] + } + if len(extend) > 3 { + internal = extend[3] + } + var err error + err = Chan.ExchangeDeclare( + exchange, // name + "x-delayed-message", // type + durable, // durable + auto_delete, // auto-deleted + internal, // internal + no_wait, // no-wait + amqp.Table{"x-delayed-type": extype}, // arguments + ) + if err != nil { + return err + } + + err = Chan.Publish( + exchange, // exchange + route, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + Headers: amqp.Table{"x-delay": delay}, + ContentType: "text/plain", + Body: body, + }) + return err +} diff --git a/send_test.go b/send_test.go new file mode 100644 index 0000000..693b1de --- /dev/null +++ b/send_test.go @@ -0,0 +1,19 @@ +package rabbitmq + +import ( + "testing" +) + +func Test_Send(t *testing.T) { + err := Connect("127.0.0.1", "5672", "tetele", "fly123456@", "ttl") + t.Log(err) + + err = Channel() + t.Log(err) + + body := []byte("order is expire") + // err = Send("direct", "ex_order_delay", "order_expire", body) + err = SendDelay("direct", "ex_order_delay", "order_expire", body, 5000) + + t.Log(err) +} diff --git a/variable.go b/variable.go new file mode 100644 index 0000000..e71036a --- /dev/null +++ b/variable.go @@ -0,0 +1,139 @@ +package rabbitmq + +// --订单-- +//订单路由 +const ORDER_EXCHANGE string = "ex_order" + +//已创建订单route key +const ORDER_CREATED_KEY string = "created" + +//订单已创建队列,订单服务用 +const ORDER_CREATED_QUEUE = "order_created_queue" + +//已创建订单队列,串货服务用 +const CHANNEL_ORDER_CREATED_QUEUE = "channel_order_created_queue" + +//已支付订单route key +const ORDER_PAYED_KEY string = "payed" + +//订单已支付队列,订单服务用 +const ORDER_PAYED_QUEUE = "order_payed_queue" + +//订单已支付队列,串货服务用 +const CHANNEL_ORDER_PAYED_QUEUE = "channel_order_payed_queue" + +//订单状态更新route key +const ORDER_STATUS_CHANGE_KEY string = "status_change" + +//订单状态更新队列,订单服务用 +const ORDER_STATUS_UPDATE_QUEUE = "order_status_update_queue" + +//订单状态更新,串货服务用 +const CHANNEL_ORDER_STATUS_CHANGE_QUEUE = "channel_order_update_status" + +//订单取消route key +const ORDER_CANCELED_KEY string = "canceled" + +//订单取消队列,订单服务用 +const ORDER_CANCELED_QUEUE = "order_canceled_queue" + +//订单取消队列,串货服务用 +const CHANNEL_ORDER_CANCELED_QUEUE = "channel_order_canceled_queue" + +//订单发货route key +const ORDER_DELIVERED_KEY string = "delivered" + +//订单发货队列,订单服务用 +const ORDER_DELIVERED_QUEUE = "order_delivered_queue" + +//订单发货队列,串货服务用 +const CHANNEL_ORDER_DELIVERED_QUEUE = "channel_order_delivered_queue" + +//订单收货route key +const ORDER_RECEIVED_KEY string = "received" + +//订单取消队列,订单服务用 +const ORDER_RECEIVED_QUEUE = "order_received_queue" + +//订单取消队列,串货服务用 +const CHANNEL_ORDER_RECEIVED_QUEUE = "channel_order_received_queue" + +//订单退款route key +const ORDER_REFUNDED_KEY string = "refunded" + +//订单退款队列,订单服务用 +const ORDER_REFUNDED_QUEUE = "order_refunded_queue" + +//订单退款队列,串货服务用 +const CHANNEL_ORDER_REFUNDED_QUEUE = "channel_order_refunded_queue" + +//订单完成route key +const ORDER_FINISHED_KEY string = "finished" + +//订单退款队列,订单服务用 +const ORDER_FINISHED_QUEUE = "order_finished_queue" + +//订单退款队列,串货服务用 +const CHANNEL_ORDER_FINISHED_QUEUE = "channel_order_finished_queue" + +//订单等待创建key,订单服务用 +const ORDER_ADD_KEY = "add" + +//订单等待创建队列,订单服务用 +const ORDER_ADD_QUEUE = "order_add_queue" + +//订单延迟队列路由 +const ORDER_DELAY_EXCHANGE string = "ex_order_delay" + +//未支付订单过期自动取消key +const ORDER_AUTO_CANCEL_KEY string = "order_auto_cancel" + +//未支付订单过期自动取消队列 +const ORDER_AUTO_CANCEL_QUEUE string = "order_auto_cancel_queue" + +//未支付订单过期自动取消队列,串货服务用 +const CHANNEL_ORDER_AUTO_CANCEL_QUEUE string = "channel_order_auto_cancel_queue" + +//订单发货后到期自动收货key +const ORDER_AUTO_RECEIVE_KEY string = "order_auto_received" + +//订单发货后到期自动收货队列 +const ORDER_AUTO_RECEIVE_QUEUE string = "order_auto_receive_queue" + +//订单自动收货队列,串货服务用 +const CHANNEL_ORDER_AUTO_RECEIVE_QUEUE string = "channel_order_auto_receive_queue" + +//订单申请退款消息key +const ORDER_ASK_FOR_REFUND_KEY string = "order_ask_for_refund" + +//订单申请退款消息队列 +const ORDER_ASK_FOR_REFUND_QUEUE string = "order_ask_for_refund_queue" + +//订单申请退款队列,串货服务用 +const CHANNEL_ORDER_ASK_FOR_REFUND_QUEUE string = "channel_order_ask_for_refund_queue" + +// --串货-- +//串货订单路由 +const CHANNEL_ORDER_EXCHANGE string = "ex_channel_order" + +//串货订单请求支付route key +const CHANNEL_ORDER_ASKPAY_KEY = "channel_order_askpay" + +//订单请求支付队列,串货服务用 +const CHANNEL_ORDER_ASKPAY_QUEUE = "channel_order_askpay_queue" + +//订单请求支付route key +const ORDER_ASKPAY_KEY = "order_askpay" + +//订单请求支付队列,串货服务用 +const ORDER_ASKPAY_QUEUE = "order_askpay_queue" + +// --通知-- +//发送接口请求路由 +const SEND_MSG_EXCHANGE string = "ex_send_msg" + +//发送接口请求路由route key +const SEND_API_MSG_KEY = "api_send_msg" + +//发送接口请求队列,定时服务用 +const SEND_MSG_QUEUE = "send_msg_queue"