RabbitMQ安装 1 2 3 4 5 6 ➜ ~ mkdir -p /Users/samtake/Documents/GitHub/www/rabbitmq ➜ ~ docker run -d --hostname rabbit-node1 --name rabbit-node1 -p 5672:5672 -p15672:15672 -v /Users/samtake/Documents/GitHub/www/rabbitmq:/var/lib/rabbitmq rabbitmq:management Unable to find image 'rabbitmq:management' locally management: Pulling from library/rabbitmq ...
报错提示,手动添加一下docker分享目录即可。
1 2 3 4 The path /www/rabbitmq is not shared from OS X and is not known to Docker. You can configure shared paths from Docker -> Preferences... -> File Sharing. See https://docs.docker.com/docker-for-mac/osxfs/
1 2 ➜ ~ docker ps | grep rabbit 75d0ec6821fc rabbitmq:management "docker-entrypoint.s…" About a minute ago Up About a minute 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp rabbit-node1
浏览器打开登录rabbitmq, 入口:http://localhost:15672 默认用户名: guest 密码: guest
RabbitMQ的UI界面使用测试 RabbitMQ的工作原理和转发模式
名词解析:exchange
:消息交换机,决定消息按什么规则,路由到哪个队列。queue
:消息载体,每个消息都会被投到一个或多个队列。binding
:绑定,把exchange和queue按照路由规则绑定起来。routing key
:路由关键字,exchange根据这关键字来投递消息。channel
:消息通道,客户端的每个连接建立多个channnel。producer
:消息生产者,用于投递消息的程序。consumer
:消息消费者,用于接收消息的程序。
exchange的工作模式fanout
:类似广播,转发到所有绑定交换机的queue。direct
:类似单播,routing key 和bingding key完美匹配。topic
:类似组播,转发到符合通配符的queue。headers
:请求头与消息匹配,才能接收消息。
实战 1.配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 const ( // AsyncTransferEnable : 是否开启文件异步转移(默认同步) AsyncTransferEnable = true // RabbitURL : rabbitmq服务的入口url RabbitURL = "amqp://guest:guest@127.0.0.1:5672/" // TransExchangeName : 用于文件transfer的交换机 TransExchangeName = "uploadserver.trans" // TransOSSQueueName : oss转移队列名 TransOSSQueueName = "uploadserver.trans.oss" // TransOSSErrQueueName : oss转移失败后写入另一个队列的队列名 TransOSSErrQueueName = "uploadserver.trans.oss.err" // TransOSSRoutingKey : routingkey TransOSSRoutingKey = "oss" )
2.生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 func init () { // 是否开启异步转移功能,开启时才初始化rabbitMQ连接 if !config.AsyncTransferEnable { return } if initChannel () { channel.NotifyClose(notifyClose) } // 断线自动重连 go func () { for { select { case msg := <-notifyClose: conn = nil channel = nil log.Printf("onNotifyChannelClosed: %+v\n" , msg) initChannel() } } }() } func initChannel() bool { //1.判断channel是否已经创建 if channel != nil { return true } //2.获得rabbitMQ的一个连接 conn, err := amqp.Dial(config.RabbitURL) if err != nil { log.Println(err.Error()) return false } //3.打开一个channel,用于消息的发布与接收等 channel, err = conn.Channel() if err != nil { log.Println(err.Error()) return false } return true } // Publish : 发布消息 func Publish(exchange, routingKey string, msg []byte) bool { //1.判断channel是否正常 if !initChannel () { return false } //2.执行消息发布 err := channel.Publish( exchange, routingKey, false , // 如果没有对应的queue, 就会丢弃这条消息 false , // amqp.Publishing{ ContentType: "text/plain" , //明文编码 Body: msg, }, ) if err != nil { log.Println(err.Error()) return false } return true }
3.消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 var done chan bool // StartConsume : 开始监听队列,获取消息 func StartConsume(qName, cName string, callback func(msg []byte) bool) { //1.通过channel.Consume获得消息信道 msgs, err := channel.Consume( qName, cName, true , //自动应答 false , // 非唯一的消费者 false , // rabbitMQ只能设置为false false , // noWait, false 表示会阻塞直到有消息过来 nil) if err != nil { log.Fatal(err) return } done = make(chan bool) go func () { // 2.循环读取channel的数据 for d := range msgs { //3.调用callback方法来处理新的消息 processErr := callback(d.Body) if processErr { // TODO: 将任务写入另一个队列,用于异常情况的重试 } } }() // 接收done 的信号, 没有信息过来则会一直阻塞,避免该函数退出 <-done // 关闭通道 channel.Close() } // StopConsume : 停止监听队列 func StopConsume () { done <- true }
demo