0%

通过Docker安装RabbitMQ

RabbitMQ安装

1
2
3
4
5
6
# /Users/samtake/Documents/GitHub/www/rabbitmq目录可自定义,主要用于目录挂载
➜ ~ mkdir -p /Users/samtake/Documents/GitHub/www/rabbitmq
➜ ~ docker run -d --hostname rabbit-node1 --name rabbit-node1 -p 5672:5672 -p15672:15672 -v /Users/samtake/Documents/GitHub/www/rabbitmq:/var/lib/rabbitmq rabbitmq:management
Unable to find image 'rabbitmq:management' locally
management: Pulling from library/rabbitmq
...

报错提示,手动添加一下docker分享目录即可。

1
2
3
4
The path /www/rabbitmq
is not shared from OS X and is not known to Docker.
You can configure shared paths from Docker -> Preferences... -> File Sharing.
See https://docs.docker.com/docker-for-mac/osxfs/#namespaces for more info.
1
2
➜  ~ docker ps | grep rabbit
75d0ec6821fc rabbitmq:management "docker-entrypoint.s…" About a minute ago Up About a minute 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp rabbit-node1

浏览器打开登录rabbitmq, 入口:http://localhost:15672
默认用户名: guest 密码: guest

RabbitMQ的UI界面使用测试

RabbitMQ的工作原理和转发模式

名词解析:
exchange:消息交换机,决定消息按什么规则,路由到哪个队列。
queue:消息载体,每个消息都会被投到一个或多个队列。
binding:绑定,把exchange和queue按照路由规则绑定起来。
routing key:路由关键字,exchange根据这关键字来投递消息。
channel:消息通道,客户端的每个连接建立多个channnel。
producer:消息生产者,用于投递消息的程序。
consumer:消息消费者,用于接收消息的程序。

exchange的工作模式
fanout:类似广播,转发到所有绑定交换机的queue。
direct:类似单播,routing key 和bingding key完美匹配。
topic:类似组播,转发到符合通配符的queue。
headers:请求头与消息匹配,才能接收消息。

实战

1.配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
const (
// AsyncTransferEnable : 是否开启文件异步转移(默认同步)
AsyncTransferEnable = true
// RabbitURL : rabbitmq服务的入口url
RabbitURL = "amqp://guest:guest@127.0.0.1:5672/"
// TransExchangeName : 用于文件transfer的交换机
TransExchangeName = "uploadserver.trans"
// TransOSSQueueName : oss转移队列名
TransOSSQueueName = "uploadserver.trans.oss"
// TransOSSErrQueueName : oss转移失败后写入另一个队列的队列名
TransOSSErrQueueName = "uploadserver.trans.oss.err"
// TransOSSRoutingKey : routingkey
TransOSSRoutingKey = "oss"
)

2.生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func init() {
// 是否开启异步转移功能,开启时才初始化rabbitMQ连接
if !config.AsyncTransferEnable {
return
}
if initChannel() {
channel.NotifyClose(notifyClose)
}
// 断线自动重连
go func() {
for {
select {
case msg := <-notifyClose:
conn = nil
channel = nil
log.Printf("onNotifyChannelClosed: %+v\n", msg)
initChannel()
}
}
}()
}

func initChannel() bool {
//1.判断channel是否已经创建
if channel != nil {
return true
}

//2.获得rabbitMQ的一个连接
conn, err := amqp.Dial(config.RabbitURL)
if err != nil {
log.Println(err.Error())
return false
}

//3.打开一个channel,用于消息的发布与接收等
channel, err = conn.Channel()
if err != nil {
log.Println(err.Error())
return false
}
return true
}

// Publish : 发布消息
func Publish(exchange, routingKey string, msg []byte) bool {
//1.判断channel是否正常
if !initChannel() {
return false
}

//2.执行消息发布
err := channel.Publish(
exchange,
routingKey,
false, // 如果没有对应的queue, 就会丢弃这条消息
false, //
amqp.Publishing{
ContentType: "text/plain", //明文编码
Body: msg,
},
)

if err != nil {
log.Println(err.Error())
return false
}
return true
}

3.消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
var done chan bool

// StartConsume : 开始监听队列,获取消息
func StartConsume(qName, cName string, callback func(msg []byte) bool) {
//1.通过channel.Consume获得消息信道
msgs, err := channel.Consume(
qName,
cName,
true, //自动应答
false, // 非唯一的消费者
false, // rabbitMQ只能设置为false
false, // noWait, false表示会阻塞直到有消息过来
nil)
if err != nil {
log.Fatal(err)
return
}

done = make(chan bool)

go func() {
// 2.循环读取channel的数据
for d := range msgs {
//3.调用callback方法来处理新的消息
processErr := callback(d.Body)
if processErr {
// TODO: 将任务写入另一个队列,用于异常情况的重试
}
}
}()

// 接收done的信号, 没有信息过来则会一直阻塞,避免该函数退出
<-done

// 关闭通道
channel.Close()
}

// StopConsume : 停止监听队列
func StopConsume() {
done <- true
}

demo