本文共 1589 字,大约阅读时间需要 5 分钟。
Producer生产者
Producer Group 作用如下:
(1)标识一类 Producer
(2)可以通过运维工具查询返个収送消息应用下有多个 Producer 实例
(3)发送分布式事务消息时,如果 Producer 中途意外宕机,Broker会主动回调Producer Group内的任意一台机器来确认事务状态。
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取到Topic路由信息(Broket注册的IP地址),并向提供Topic服务的Master建立长连接,并且定时(默认30秒)向Master 发送心跳。
发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。通过增加机器,可以水平扩展队列容量。另外也可以自定义方式选择发往哪个队列。注:另外多个队列可以部署在一台机器上,也可以分别部署在多台不同的机器上。
Consumer消费者
用来表示一个消费消息应用,一个Consumer Group下包含多个Consumer实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个Consumer对象。一个Consumer Group下的多个Consumer以均摊 方式消费消息,如果设置为广播方式,那么这个Consumer Group下的每个实例都消费全量数据。
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer取到Topic 路由信息,并向提供Topic服务的Master、Slave建立长连接,并定时向Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
假设有5个队列,2 个Consumer,那举第一个Consumer消费3个队列,第二Consumer 消费2个队列。这样即可达到平均消费的目的,可以水平扩展Consumer来提高消费能力。但是Consumer 数量要小于等于队列数量,如果Consumer超过队列数量,那举多余的Consumer将不能消费消息。
修改消费并行度方法:
(1)同一个Consumer Group下,通过增加Consumer实例数量来提高并行度,超过订阅队列数的Consumer实例无效。可以通过加机器,或者在已有机器启动多个进程的方式。
(2)提高单个 Consumer 的消费并行线程,通过修改设置 consumeThreadMin最小并发线程数和consumeThreadMax最大并发线程数来提高消费能力。
(3)通过设置Consumer的consumeMessageBatchMaxSize这个参数,默认是1,即一次只消费一条消息,例如设置为N,那么每次消费的消息数小于等于N。这样即可大幅度提高消费的吞吐量。
Tag消息过滤
最后讲解一些Tag,Tag是用于消息分类时使用的!但是只能做简单的过滤。那么能不能消费端A订阅这个Tag1就能确定Tag1消息发送到消费端A呢?答案是不能!根据前面的Consumer负载均衡策略,一个消息不能确定被发送到具体的某个消费者,所以只能做简单的过滤数据分类。转载地址:http://fpzfo.baihongyu.com/