ZGC简介

1. 介绍

java11介绍了一个优秀的垃圾回收实现ZGC,ZGC是一款以低延迟为首要目标、基于paper内存布局、不设分代的并发垃圾收集器。

相比于其他垃圾回收器,ZGC有着不同的特点:

  1. 低时延,在垃圾回收过程中,只有与root有关的处理才会STW,时延不是随着堆大小而增加,低时延是最大的特点,也是ZGC的设计目标

  2. 内存分区管理

    支持不同的分区粒度,有小页面、中页面、大页面之分。

  3. 不分代的垃圾回收

    在标记阶段,对全量内存进行标记,在回收的时候,选择回收价值更大的页面

  4. Colored Pointer,在引用上利用标记位来表示对象不同的状态,并利用多视图映射将不同的虚拟地址映射岛同一物理地址

  5. 使用Load-Barrier,来完成并发标记和并发重定位

  6. 支持NUMA,进来把对象分配在访问速度比较快的地方

2. 内存管理

ZGC的内存管理和G1类似,使用分区管理。在G1中,一个分区称为一个region,大小是固定的。在ZGC中分区叫做page,分为小、中、大

3. ZGC流程

ZGC回收流程包括6个阶段,可分为标记和重定位两个过程

标记的方法很多种,1. 弄个map,key为对象地址,value为对象状态;2. 在对象头,记录对象状态。3. ZGC采用的是 对引用进行标记,把对象状态,记录在引用的几个bit上,即“Colored Pointer”

4. 标记过程

  1. 开始标记,STW,这个阶段,GC线程完成对根引用的标记,由于跟引用数量通常很少,所以这个阶段很短
  2. 并发标记,GC线程从根引用开始深度优先遍历对象图,标记达到的每一个对象。工作线程的Load-Barrier检测到未标记的引用时,也会标记。还会对上一次GC中没有完成重映射的引用,进行重映射,即Remap。
  3. 结束标记,也是STW的。处理一些边缘情况,比如弱引用,这个阶段很快,时间较短

标记过程中,除了使用“Colored Pointer”进行标记外,针对每一个page还会使用liveMap记录page里哪些对象时活得,以及所占空间,为下一个阶段中 选择合适回收page和存活对象重定位做准备。

指针表示一个字节在虚拟内存中的具体地址,我们42位表示指针的地址,4位来表示这个指针的一些属性(Finalizable,Remapped,Marked,Marked)用于ZGC标记

  • Finalizable标记只能通过终结器访问对象
  • Remapped 表示这个引用是最新的,只想对象的当前位置。在load-barrier中,完成重映射后,会设置为Remapped
  • Marked0和Marked1 标记可访问的对象,在ZGC标记过程中,使用这两交替访问

ZGC使用42位表示地址,所以ZGC支持堆内存最大位4TB,JDK15中,扩展到了16T

5. 重定位过程

在完成了标记对象标记后,需要进行具体的回收工作了,在ZGC中,使用复制算法来进行垃圾回收。重定位过程会把需要垃圾回收的page中存活的对象移到一个新的page中,page中存活对象移动完成后,page的回收就完成了。

重定位包含以下阶段:

  • 准备重定位,这个过程是并发的,主要是选择想进行回收的page,放入到重定位集中。

  • 开始重定位,这个过程是stw的,将relocation set中所有根引用重定位,并且更新引用

  • 重定位是并发的,根据标记得到的liveMap,GC线程对relocation set中所有存活的对象进行重定位,将新旧地址之间的映射存储在转发表中,应用线程在访问到GC线程还没来得及重定位的对象时,也会给对象进行重定位

6. 重映射过程

重定位只是在转发表中保存了新旧地址的映射,并没有对对象的引用进行更新

当访问对象时,使用的引用地址还是错误的地址,在ZGC中,在Load-Barrier中,根据引用的状态和转发表中的记录,进行引用地址的修正,确保每次能访问到正确对象,并将对象设置为Remapped

ZGC将这个指针修正的过程被称为指针的自愈

其余没有被Load-Barrier,将在下一个标记阶段进行

因为在引用修正的过程中,放在了Load-Barrier中,由应用线程处理,所以,在一次GC中,我们只需要在标记过程中遍历一次对象树。

7. Load-Barrier

Load Barrier可以翻译成读屏障,即写屏障。ZGC中,标记和移动阶段,每次【从堆里对象的引用类型中读取一个指针】的时候,都会触发一个Load-Barrier。(比如obj.fieldA)

8. 并发标记过程详解

并发标记时,GC线程和应用线程同时工作,需要特殊处理,防止出现漏标和错标的i情况。错标会出现浮动垃圾,影响不大,会在下一次垃圾回收中回收掉。漏标会让一些可达对象被当作垃圾处理,影响程序正确性,使用 Load-Barrier避免漏标

9. 如何进行页面回收的选择

选择分为两步:筛选可以被回收的页面,从中选择垃圾比较多的页面加入relocation set

  1. 筛选所有可以被回收的页面

    如果页面时本次垃圾回收启动后新分配的页面,就无需回收

    如果页面在标记过程中没标记过,即没有存活对象,可以直接放回页面缓存,重用。

    如果页面是大页面,则不回收,因为大页面只有一个对象,不能回收

    如果页面是中页面、或者小页面,则只有页面垃圾所占空间超过页面空间的25%时,才可以被回收

  2. 选择垃圾比较多的页面加入relocation set

    排序,根据页面的liveMap来计算活跃内存的大小,按照页面活跃内存数从小到大排序

    选择排序靠前的页面,加入relocation set

10. 总结

​ ZGC是一款以低延迟为首要目标,基于page内存布局的、不设分代的、使用了Load-Barrier、Colored Poiner和内存多视图映射等技术的并发垃圾收集器。stw时间只与根节点的数目有关,其余标记和处理过程都是并发的,因此可以保证时延时间基本不随着堆大小变化。但是部分标记和重映射工作由Load-Barrier来完成,吞吐量有所下降

Kafka消费者消费消息(高级API)

1. 介绍

Kafka消费者有三种API,Scala实现的两种消费者API 叫做高级API和低级API,新版本的消费者采用Java重新实现叫做 新API。 但不管采用什么版本实现,消费者消费消息的主要工作没有太大变化 , 比如为消费者分配分区、拉取线程拉取消息客户端消费消息、更新拉取状态、提交偏移量 。 本文介绍内容主要是高级API

消费者拉取钱程拉取每个分区的数据,会将分区的消息集包装成一个数据块,放入分区信息的队列中 。 而每个队列都对应一个消息流,消费者客户端选代消息流,实际上是迭代每个数据块中消息集的每条消息 。
如图所示,一个队列包含多个数据块,每个数据块对应一个分区的消息集, 一个消息集包含
多条消息 。 消费者迭代器封装了迭代获取消息的逻辑,客户端不需要面向数据
块、消息集这些内部对象,只需要对消费者迭代器循环获取消息即可 。

kafka消费者消费流程

二、消费者迭代消费消息

​ 消费者迭代器生成包含消息的迭代器,首先弹出队列的每个数据块,然后获取数据块对应的消息集,最后迭代消息集中的每条消息 。 客户端迭代的消息是队列的所有数据块,而不是一个数据块。 所以在迭代过程中,要确保读取完一个数据块后,接着读取下一个数据块。 也就是说,消费者迭代器是:所有数据块通过消息集组成的消息迭代器 。

​ 消费者迭代器实现了 Java的 Iterator接口,必须重载hasNext ()和 next()方法。 hasNext ()方法会用来判断迭代器是杏结束, next ()方法每调用一次就指向迭代器的下一个元素 。 迭代的过程因为最上层数据结构是包含数据块的阻塞队列,所以从队列中弹出一个数据块就已经足够调用很多次next()了 。只有当前数据块的消息集都遍历完成后,才会从队列中弹出新的数据块

​ 在迭代的过程中,可能 多 次调用 next()方法都还是在同一数据块的同一个消息集 中, 所以迭代器妥保存当前的数据块、当前的 消息集。 如果当前消息 集没有下一个元素,则 需要同时更新这两个变量 。 因为一个数据块对应一个消息集 ,一旦当前消息集没有元素 了, 说明 这个数据块也.已经迭代完毕。

三、偏移量

3.1 两种偏移量

消费者的“拉取线程” 拉取消息后会更新“拉取状态 ” ,对应的 “消费线程”获取消息后也要更新相关的“消费状态” 。 (准确地说,消费消息的对象是一个迭代器而不是钱程。 这里为了和拉取线程相对应,故叫作消费线程。 拉取状态对应分区信息、对象的拉取偏移量 , 表示消费者已经拉取的分区位置 ; 消费状态对应了消费偏移量,表示消费者已经消费完成的偏移量 。
如 图 3-23所示 , 拉取消息的线程和消费消息 的线程是两个独立的工作模块,前者通过分区信息对象的阻塞队列将消息传给消费消息的线程完成数据的传输 。 消息拉取后只有被消费线程真正消后,才会更新消费状态 。 也就是说,“拉取线程更新拉取偏移量,消费线程更新消费偏移量”,

具体步骤如下 :
(1) 消费者 的拉取线程从服务端拉取分区 的消息 。
(2)拉取到分区消息后 ,就更新分区信息对象的拉取偏移量 。
(3)将分区数据的消 息集封装成数据块 。
(4) 客户端循环迭代数据块的消息集 。
(5) 消费完一条消息后,就更新分区信息对象的消费偏移量
(6)消息流中的每一条消息返回给消费者客户端应用程序 。

拉取线程和消费线程分别更新分区信息的状态

3.2 提交偏移量

在旧版本中每个分区的偏移量都保存到ZK中 ,分区信息的拉取偏移初始时从ZK读取,然后在拉取消息后更新 。 同样,消费偏移盘初始时也是从ZK读取,然后在消费消息后更新。 消费者消费了新的消息后,还应该及时地将消费进度(即分区信息的消费偏移盘)保存到ZK中 。 每个分区都要和ZK产生一次交互,况且还要周期性地写人,这对ZK来说是个不小的负担。 在新版本中把偏移量像普通消息一样写入Kafka集群的内部主题。 而且正像消息会源源不断地写到集群一样,记录偏移量也是周期性的 。 Kafka支持高吞吐量的消息写入,对于偏移量的记录当然也不在话下 。

消费者提交偏移量是为了保存分区的消费进度 。 因为Kafka保证同一个分区只会分配给消费组中的唯一消费者,所以即使发生再平衡后,分区和消费者的所有权关系发生变化,新消费者也可以接着上一个消费者记录的偏移盘位置继续消费消息 。
但是消费者即使记录了分区的偏移量,仍然无法解决消息被重复消费的问题。 例如,消费者每隔 10秒提交一次偏移量,在 10秒时提交的偏移量是 100 ,下一次提交的可点是20秒。 在20秒之前,消费者又消费了 30条消息,然后消费者突然挂掉了 。 由于偏移量现在仍然停留在 10。这个位置,因此新的消费者2也只会从 10。这个位置继续消费,从而会重复处理偏移量为 100之后的30条消息 。通常消息被重复处理是可以接受的,至少不会出现消息丢失这种不可接受的问题。 定时提交偏移量的周期时间越长,消息被重复消费的数据量就越多。 客户端可以将这个周期时间设置得更短,来减少重复消费的消息量。 当然也不能太短,否则会导致客户端和保存偏移量的存储系统产生大量的网请求 。

消费者提交偏移量到Kafka的 内部主题,首先要确定连接哪个或者哪些服务端节点 。 回顾一下 ,
生产者发送消息时会根据分区的主副本分组 , 和多个节点者建立连接 ; 消费者分配多个分区 ,也要根
据分区的主副本分组, 和多个节点建立连接。 而消费者提交所有分区 的偏移量时, 实际上只和-个服
务端节点建立连接。 同样要处理多个分区,为什么普通消息需要多个连接,而偏移量只需要一个连接?
如图 3-24所示,目标节点指的是分区的主副本节点,我们给 出 了偏移量的多种连接方案 。
(1)如果不同分区的偏移盐写到了不同的节点,消费者分配了多个分区,当要读取不同分区的偏
移盘时,就得连接不同的节点才可以获得完整的数据。
(2)如果能让所有分区的偏移草’数据只保存在一个节点,消费者就只需要 同一个节点通信 。 但因
为消费者和分区的关系是变化的, 即使保证这一次分区在一个节点上, 也无法保证下一次仍然在同一
个节点 。
(3)如果消费组所有消费者所有分区的偏移量都保存在一个节点,就可以解决第二种方式的 问题 。

实际上 , 消费者 的分区偏移盏’要保存在哪个节点,跟消费者所属 的消费组有关系 。 只要保证
消费组级别的偏移i量在一个节点上, 即使消费者和分区的关系发生变化 , 也能够保证消费者访问新分
配的分区时 , 只需要访问一个节点 。

同一个消费组的所有消费者,以内部主题形式提交所有分区的偏移盘到一个目标节点,这个内部主题和普通消息的主题一样也会有多个分区。 如果只有一个分区 ,所有消费组都只能提交到唯一的节点,将所有读写请求都压到一个节点的相同问题。 而如果有多个分区,并且以消费组作为分区的分布条件,不同消费组提交到的偏移量有可能是不同的节点,就可以分散偏移盘读写的压力 。

总结下客户端需要确定服务端节点的几个场景。

  • 生产者发送消息时,直接在客户端决定消息要发送给哪个分区,这一步不向服务端发送请求 。
  • 消费者拉取管理器的线程向服务端发送主题元数据请求,获取包含了主副本等信息的所有分区元数据,消费者拉取线程才能确定要连接哪些服务端节点 。
  • 提交偏移量虽然有点像生产者的发送消息,都是写数据,但也需要和消费者一样,获取分区的主副本作为偏移管理器,才能确定提交到哪个节点 。

生产者、消费者、偏移量和Kafka集群的网络连接

四、连接偏移管理器

前面我们分析的拉取偏移方法和提交偏移量方法,都需要和偏移主-管理器通信。 在这之前,消费者需要通过 channelToOffsetManage ()方法向服务端任意一个节点发送“消费组的协调者请求”,来获取消费组对应的协调节点 ,即偏移量管理器节点 。服务端处理消费组的协调者请求,实际上也是通过查询主题的元数据来完成的 。 不过和 中返回主题元数据,然后还要在客户端继续处理( 比如获取存在主副本的分区)不同,这里在服务端完成“选择消费组对应内部主题的分区的主副本节点”,然后直接返回这个协调节点给客户端。 也就是说客户端发送消费组的协调者请求,服务端返回的就是消费组的协调节点 。

五、服务端处理提交偏移量的请求

协调节点会将消费者的偏移量提交请GroupCoordlnator类的 handleCommitOffsets ()方法处理,其中参数offsetMetadata表示分配给消费者的所有分区消费进度 。

写入偏移消息会调用 RepllcaManager .appendMessages ()方法,将消息集追加到本地日志文件,
并且会把分区和对应的偏移量保存在协调节点的缓存中 。 目的是:再平衡后如果其他消费者需要读取
分区的偏移毡 , 在连接上协调节点后,可以直接读取缓存 , 而不需要从日志文件中读取。

六、再平衡和分区分配

使用高级APl ,每个消费者进程启动时都会创建一个消费者连接器,并在ZK中注册消费者成员变
化、分区变化的监昕器。三旦监昕器注册的事件被触发,就会调用ZKRebalanceListener的再平衡方
法,为消费组的所有消费者重新分配分区 。 为了保证整个消费组分区分配算法的一致性,当一个消费
者触发再平衡时,该消费组内的所有消费者会同时触发再平衡。 如图(左)所示,第一个消费者
加入消费组触发再平衡,这时消费组只有一个消费者,所有的分区都分配给第一个消费者 。 如图
(右)所示,第二个消费者力n人同一个消费组,会触发所有消费者的再平衡,即第一个消费者和第二
个消费者都会再平衡。

消费组的消费者再平衡

由于每个消费者的再平衡都是独立的进程,消费者之间并不知道其他消费者的再平衡最后有没有
成功。 可能有些消费者的再平衡成功了,有些却失败了,就会导致本来分配给这个消费者的分区,因为
再平衡失败而无法被消费,但是其他消费者又都无法知 l斑。 解决这个问题的方法是:在服务端为每个消
费组都选举一个协调节点,让它负责某个消费组中所有消费者的协调了作。 另外,消费者提交分区的偏
移i卫;也是写到协调节点上的 。 实际上,消费者客户端发送给服务端的请求“只要和消费组相关,都会被
协调节点处理” 。 如图所示,消费者执行再平衡和提交偏移ill都直接和协调者交互,具体步骤如下。
(I )每个消费者触发再平衡时都和协调者联系,由协调者执行全局的分区分配 。
(2)协调者分配完成后,将分区分配给每个消费者 。
(3)每个消费者收到任务列表后,启动拉取钱程,拉取对应分区的消息,并更新拉取状态 。
(4)消费者周期性提交分区的偏移量给协调者,协调者将分区偏移写到内部主题 。

消费者消费分区和提交偏移量都经过协调者

Nginx面试常问问题

1. nginx的负载均衡算法有哪些?

Nginx的upstream模块支持6种方式的负载均衡策略(算法):轮询(默认方式)、weight(权重方式)、ip_hash(依据ip分配方式)、least_conn(最少连接方式)、fair(第三方提供的响应时间方式)、url_hash(第三方通过的依据URL分配方式)。

2. ngxin的作用

web 服务.

负载均衡 (反向代理)

web cache(web 缓存)

动静分离

3. nginx的优点:

模块化、事件驱动、异步、非阻塞、多进程单线程

高并发。静态小文件

占用资源少。2万并发、10个线程,内存消耗几百M。

功能种类比较多。web,cache,proxy。每一个功能都不是特别强。

支持epoll模型,使得nginx可以支持高并发。

nginx 配合动态服务和Apache有区别。(FASTCGI 接口)

利用nginx可以对IP限速,可以限制连接数。

配置简单,更灵活。

4. nginx为什么抗高并发

Nginx的高并发得益于其采用了epoll模型

nginx采用epoll模型,异步非阻塞。对于Nginx来说,把一个完整的连接请求处理都划分成了事件,一个一个的事件。比如accept(), recv(),磁盘I/O,send()等,每部分都有相应的模块去处理,一个完整的请求可能是由几百个模块去处理。真正核心的就是事件收集和分发模块,这就是管理所有模块的核心。只有核心模块的调度才能让对应的模块占用CPU资源,从而处理请求。拿一个HTTP请求来说,首先在事件收集分发模块注册感兴趣的监听事件,注册好之后不阻塞直接返回,接下来就不需要再管了,等待有连接来了内核会通知你(epoll的轮询会告诉进程),cpu就可以处理其他事情去了。一旦有请求来,那么对整个请求分配相应的上下文(其实已经预先分配好),这时候再注册新的感兴趣的事件(read函数),同样客户端数据来了内核会自动通知进程可以去读数据了,读了数据之后就是解析,解析完后去磁盘找资源(I/O),一旦I/O完成会通知进程,进程开始给客户端发回数据send(),这时候也不是阻塞的,调用后就等内核发回通知发送的结果就行。整个下来把一个请求分成了很多个阶段,每个阶段都到很多模块去注册,然后处理,都是异步非阻塞。异步这里指的就是做一个事情,不需要等返回结果,做好了会自动通知你。

5. 为什么nginx的总体性能比Apache高?

nginx使用最新的epoll和kqueue网络IO模型,而Apache使用传统的select模式。

目前Linux下能够承受高并发访问的squid、Memcached 都采用的是epoll网络IO模型。

6. nginx如何处理请求?

nginx在启动时会以daemon形式在后台运行,采用多进程+异步非阻塞IO事件模型来处理各种连接请求。多进程模型包括一个master进程,多个worker进程,一般worker进程个数是根据服务器CPU核数来决定的。master进程负责管理Nginx本身和其他worker进程

在nginx多进程中,每个worker都是平等的,因此每个进程处理外部请求的机会权重都是一致的。

Master进程读取并验证配置文件nginx.conf;管理worker进程;而每一个Worker进程都维护一个线程(避免线程切换),处理连接和请求;注意Worker进程的个数由配置文件决定,一般和CPU个数相关(有利于进程切换),配置几个就有几个Worker进程。worker进程中有一个ngx_worker_process_cycle()函数,执行无限循环,不断处理收到的来自客户端的请求,并进行处理,那就要讲讲多进程模型的处理流程了。

7. nginx多进程模式的处理流程

首先,master进程一开始就会根据我们的配置,来建立需要listen的网络socket fd,然后fork出多个worker进程。

其次,根据进程的特性,新建立的worker进程,也会和master进程一样,具有相同的设置。因此,其也会去监听相同ip端口的套接字socket fd。

然后,这个时候有多个worker进程都在监听同样设置的socket fd,意味着当有一个请求进来的时候,所有的worker都会感知到。这样就会产生所谓的“惊群现象”。为了保证只会有一个进程成功注册到listenfd的读事件,nginx中实现了一个“accept_mutex”类似互斥锁,只有获取到这个锁的进程,才可以去注册读事件。其他进程全部accept 失败。

最后,监听成功的worker进程,读取请求,解析处理,响应数据返回给客户端,断开连接,结束。因此,一个request请求,只需要worker进程就可以完成。

多进程模型的好处是进程之间独立,有一个worker坏了也不会影响逼得worker瘫痪

nginx还会将进程和cpu某一个核绑定,避免因为进程切换带来的缓存失效

8. 为什么要采用异步非阻塞?

多进程模型+异步非阻塞模型才是胜出的方案。单纯的多进程模型会导致连接并发数量的降低,而采用异步非阻塞IO模型很好的解决了这个问题;并且还因此避免的多线程的上下文切换导致的性能损失。

9. 如何避免所有的请求都被一个worker进程给竞争获取了

Nginx采用了一个是否打开accept_mutex选项的值,ngx_accept_disabled标识控制一个worker进程是否需要去竞争获取accept_mutex选项,进而获取accept事件

ngx_accept_disabled值:nginx单进程的所有连接总数的八分之一,减去剩下的空闲连接数量,得到的这个ngx_accept_disabled。
当ngx_accept_disabled大于0时,不会去尝试获取accept_mutex锁,并且将ngx_accept_disabled减1,于是,每次执行到此处时,都会去减1,直到小于0。不去获取accept_mutex锁,就是等于让出获取连接的机会,很显然可以看出,当空闲连接越少时,ngx_accept_disable越大,于是让出的机会就越多,这样其它进程获取锁的机会也就越大。不去accept,自己的连接就控制下来了,其它进程的连接池就会得到利用,这样,nginx就控制了多进程间连接的平衡了。

10. nginx挂了怎么办?

Keepalived+Nginx实现高可用,Keepalived是一个高可用解决方案,主要是用来防止服务器单点发生故障,可以通过和Nginx配合来实现Web服务的高可用。

第一:请求不要直接打到Nginx上,应该先通过Keepalived(这就是所谓虚拟IP,VIP)

第二:Keepalived应该能监控Nginx的生命状态(提供一个用户自定义的脚本,定期检查Nginx进程状态,进行权重变化,,从而实现Nginx故障切换)

  1. Nginx如何做到热部署?

修改配置文件nginx.conf后,重新生成新的worker进程,当然会以新的配置进行处理请求,而且新的请求必须都交给新的worker进程,至于老的worker进程,等把那些以前的请求处理完毕后,kill掉即可。

RocketMQ版的分布式事务实现

一、 什么是RocketMQ

RocketMQ 是阿里巴巴开源的分布式消息中间件。支持消息重试、延时消息、消息追踪、实时消息、分布式事务消息等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。

二、RocketMQ 特点
  • 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式等特点
  • Producer、Consumer、队列都可以分布式
  • Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 Consumer 实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 Topic 对应的队列集合
  • 能够保证严格的消息顺序
  • 支持拉(pull)和推(push)两种消息模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力
  • 支持多种消息协议,如 JMS、OpenMessaging 等
  • 较少的依赖
三、分布式场景

假设 AB100块钱,同时它们不是同一个服务上。

目标:就是 A 减100块钱,B 加100块钱。

实际情况可能有四种:

1
2
3
4
5
6
7
1)就是A账户减100 (成功),B账户加100 (成功)

2)就是A账户减100(失败),B账户加100 (失败)

3)就是A账户减100(成功),B账户加100 (失败)

4)就是A账户减100 (失败),B账户加100 (成功)

这里 第1和第2 种情况是能够保证事务的一致性的,但是 第3和第4 是无法保证事务的一致性的。

那我们来看下RocketMQ是如何来保证事务的一致性的

四、RocketMQ分布式事务原理


rocketMq分布式

1、A服务先发送个Half Message给Brock端,消息中携带 B服务 即将要+100元的信息。 2、当A服务知道Half Message发送成功后,那么开始第3步执行本地事务。 3、执行本地事务(会有三种情况1、执行成功。2、执行失败。3、网络等原因导致没有响应) 4.1)、如果本地事务成功,那么Product像Brock服务器发送Commit,这样B服务就可以消费该message。 4.2)、如果本地事务失败,那么Product像Brock服务器发送Rollback,那么就会直接删除上面这条半消息。 4.3)、如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口,来进行事务的回查。

从上面流程可以得知 只有A服务本地事务执行成功 ,B服务才能消费该message

然后我们再来思考几个问题?

1
为什么要先发送Half Message(半消息)

我觉得主要有两点

1
2
3
4
1)可以先确认 Brock服务器是否正常 ,如果半消息都发送失败了 那说明Brock挂了。

2)可以通过半消息来回查事务,如果半消息发送成功后一直没有被二次确认,那么就会回查事务状态。
什么情况会回查

也会有两种情况

1
2
3
4
1)执行本地事务的时候,由于突然网络等原因一直没有返回执行事务的结果(commit或者rollback)导致最终返回UNKNOW,那么就会回查。

2) 本地事务执行成功后,返回Commit进行消息二次确认的时候的服务挂了,在重启服务那么这个时候在brock端
它还是个Half Message(半消息),这也会回查。

特别注意: 如果回查,那么一定要先查看当前事务的执行情况,再看是否需要重新执行本地事务。

想象下如果出现第二种情况而引起的回查,如果不先查看当前事务的执行情况,而是直接执行事务,那么就相当于成功执行了两个本地事务。

1
为什么说MQ是最终一致性事务

通过上面这幅图,我们可以看出,在上面举例事务不一致的两种情况中,永远不会发生

1
A账户减100 (失败),B账户加100 (成功)

因为:如果A服务本地事务都失败了,那B服务永远不会执行任何操作,因为消息压根就不会传到B服务。

那么 A账户减100 (成功),B账户加100 (失败) 会不会可能存在的。

答案是会的

因为A服务只负责当我消息执行成功了,保证消息能够送达到B,至于B服务接到消息后最终执行结果A并不管。

那B服务失败怎么办?

如果B最终执行失败,几乎可以断定就是代码有问题所以才引起的异常,因为消费端RocketMQ有重试机制,如果不是代码问题一般重试几次就能成功。

如果是代码的原因引起多次重试失败后,也没有关系,将该异常记录下来,由人工处理,人工兜底处理后,就可以让事务达到最终的一致性。

参考资料:

https://www.cnblogs.com/weifeng1463/p/12889300.html

https://www.jianshu.com/p/3afd610a8f7d

理解Netty中的零拷贝(Zero-Copy)

一、理解零拷贝

“Zero-copy” describes computer operations in which the CPU does not perform the task of copying data from one memory area to another.

​ “零拷贝”是指计算机操作的过程中,CPU不需要为数据在内存之间的拷贝消耗资源。而它通常是指计算机在网络上发送文件时,不需要将文件内容拷贝到用户空间(User Space)而直接在内核空间(Kernel Space)中传输到网络的方式。


零拷贝

Zero Copy的模式中,避免了数据在用户空间和内存空间之间的拷贝,从而提高了系统的整体性能。Linux中的sendfile()以及Java NIO中的FileChannel.transferTo()方法都实现了零拷贝的功能,而在Netty中也通过在FileRegion中包装了NIO的FileChannel.transferTo()方法实现了零拷贝。

二、Netty中的零拷贝

在Netty中还有另一种形式的零拷贝,即Netty允许我们将多段数据合并为一整段虚拟数据供用户使用,而过程中不需要对数据进行拷贝操作,这也是我们今天要讲的重点。我们都知道在stream-based transport(如TCP/IP)的传输过程中,数据包有可能会被重新封装在不同的数据包中,即 粘包

粘包怎么出现的呢?

我们知道TCP/IP协议是目前的主流网络协议。它是一个多层协议,最下层是物理层,最上层是应用层(HTTP协议等),而做Java应用开发,一般只接触TCP以上,即传输层和应用层的内容。这也是Netty的主要应用场景。

TCP报文有个比较大的特点,就是它传输的时候,会先把应用层的数据项拆开成字节,然后按照自己的传输需要,选择合适数量的字节进行传输。什么叫”自己的传输需要”?首先TCP包有最大长度限制,那么太大的数据项肯定是要拆开的。其次因为TCP以及下层协议会附加一些协议头信息,如果数据项太小,那么可能报文大部分都是没有价值的头信息,这样传输是很不划算的。因此有了收集一定数量的小数据,并打包传输的Nagle算法(这个东东在HTTP协议里会很讨厌,Netty里可以用setOption(“tcpNoDelay”, true)关掉它)。

这么说可能不太好理解,我们举个例子吧:

发送时,我们这样分3次写入(‘|’表示两个buffer的分隔):

1
2
3
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+

接收时,可能变成了这样:

1
2
3
+----+-------+---+---+
| AB | CDEFG | H | I |
+----+-------+---+---+

因此在实际应用中,很有可能一条完整的消息被分割为多个数据包进行网络传输,而单个的数据包对你而言是没有意义的,只有当这些数据包组成一条完整的消息时你才能做出正确的处理,而Netty可以通过零拷贝的方式将这些数据包组合成一条完整的消息供你来使用。而此时,零拷贝的作用范围仅在用户空间中。

三、 Netty中的ChannelBuffer

与Zero Copy直接相关的CompositeChannelBuffer类。 ###CompositeChannelBuffer类 CompositeChannelBuffer类的作用是将多个ChannelBuffer组成一个虚拟的ChannelBuffer来进行操作。为什么说是虚拟的呢,因为CompositeChannelBuffer并没有将多个ChannelBuffer真正的组合起来,而只是保存了他们的引用,这样就避免了数据的拷贝,实现了Zero Copy。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CompositeChannelBuffer{

//components保存所有内部ChannelBuffer
private ChannelBuffer[] components;
//indices记录在整个CompositeChannelBuffer中,每个components的起始位置
private int[] indices;
//缓存上一次读写的componentId
private int lastAccessedComponentId;

public byte getByte(int index) {
//通过indices中记录的位置索引到对应第几个子Buffer
int componentId = componentId(index);
return components[componentId].getByte(index - indices[componentId]);
}

public void setByte(int index, int value) {
int componentId = componentId(index);
components[componentId].setByte(index - indices[componentId], value);
}

}

其中readerIndex既读指针和writerIndex既写指针是从AbstractChannelBuffer继承而来的;然后components是一个ChannelBuffer的数组,他保存了组成这个虚拟Buffer的所有子Buffer,indices是一个int类型的数组,它保存的是各个Buffer的索引值;最后的lastAccessedComponentId是一个int值,它记录了最后一次访问时的子Buffer ID。从这个数据结构,我们不难发现所谓的CompositeChannelBuffer实际上就是将一系列的Buffer通过数组保存起来,然后实现了ChannelBuffer 的接口,使得在上层看来,操作这些Buffer就像是操作一个单独的Buffer一样。

完全理解Netty零拷贝

参考:

https://my.oschina.net/plucury/blog/192577

https://github.com/code4craft/netty-learning/blob/master/posts/ch2-buffer.md

AQS介绍

一、AQS的简单介绍

​ AQS的全称是AbstractQueuedSynchronizer ,这个类在 java.util.concurrent.locks 包下面 AQS是基于FIFO的队列实现的,并且内部维护了一个状态变量state,通过原子更新这个状态变量state即可以实现加锁解锁操作。 ReentrantLock,Semaphore,其他的如 CyclicBarrier ,CountDownLatch,FutureTask SynchronousQueue等等皆是基于 AQS 的。

​ 当然,我们自己也能利用 AQS 非常轻松容易地构造出符合我们自己需求的同步器。

二、源码介绍
  1. 先看内部类,节点中保存着当前线程、前一个节点、后一个节点以及线程的状态等信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 互斥模式
static final Node EXCLUSIVE = null;

// 标识线程已取消
static final int CANCELLED = 1;
// 标识后继节点需要唤醒
static final int SIGNAL = -1;
// 标识线程等待在一个条件上
static final int CONDITION = -2;
// 标识后面的共享锁需要无条件的传播(共享锁需要连续唤醒读的线程)
static final int PROPAGATE = -3;

// 当前节点保存的线程对应的等待状态
volatile int waitStatus;

// 前一个节点
volatile Node prev;

// 后一个节点
volatile Node next;

// 当前节点保存的线程
volatile Thread thread;

// 下一个等待在条件上的节点(Condition锁时使用)
Node nextWaiter;

// 是否是共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}

// 获取前一个节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

// 节点的构造方法
Node() { // Used to establish initial head or SHARED marker
}

// 节点的构造方法
Node(Thread thread, Node mode) { // Used by addWaiter
// 把共享模式还是互斥模式存储到nextWaiter这个字段里面了
this.nextWaiter = mode;
this.thread = thread;
}

// 节点的构造方法
Node(Thread thread, int waitStatus) { // Used by Condition
// 等待的状态,在Condition中使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
  1. 关键的state
1
*// 控制加锁解锁的状态变量* private volatile int state; 

​ 用volatile关键字来修饰,因为是在多线程环境下操作,要保证它们的值修改之后对其它线程立即可见

state非常重要,所有的同步器都是通过stae来控制锁的状态。state的修改必须用cas

​ 例如:CountDownLatch,首先通过构造函数设置state = n,需要countDown()执行n次,await()才会返回。这里用到的就是state,每当执行一次countDown(),state就-1,知道所有的子线程执行完毕,state为0,await()方法就可以返回

  1. 实现一个AQS所需要实现的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 互斥模式下尝试获取锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 互斥模式下尝试释放锁
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式下尝试获取锁
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式下尝试释放锁
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
// 如果当前线程独占着锁,返回true
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}

这里用到了一种设计模式,即模板方法模式,自定义同步器时只需要重写上面几个方法即可,AQS中其他类都是final类型的,只有这几个方法能被其它类使用。那么重写了几个方法为什么可以实现同步器呢?这是因为AQS父类已经帮我买写好了一系列操作,包括入队,出队等。具体操作等后面介绍具体同步器的时候就可以知道。后面会介绍ReentrantLock,CountDownLatch等。

三、核心思想

​ AQS核心思想是:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效 的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需 要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是用 CLH 队列锁实现 的,即将暂时获取不到锁的线程加入到队列中

Kafka介绍(二)

一、Kafka架构介绍

​ 一个典型的Kafka集群中包含若干Producer,若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。


kafka架构

二、Topics和Partition的关系

​ 主题(topic):一个 topic 里保存的是同一类消息,相当于对消息的分类,每个 producer 将消息发送到 kafka 中,都需要指明要存的 topic 是哪个,也就是指明这个消息属于哪一类。

 分区(partition):每个 topic 都可以分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部。为什么要进行分区呢?最根本的原因就是:kafka基于文件进行存储,当文件内容大到一定程度时,很容易达到单个磁盘的上限,因此,采用分区的办法,一个分区对应一个文件,这样就可以将数据分别存储到不同的server上去,另外这样做也可以负载均衡,容纳更多的消费者。

​ Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高

三、Consumer Group

​ 每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费。(不同consumer group可以同时消费同一条消息)

  很多传统的message queue都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证queue的长度比较少,提高效率。而如上文所将,Kafka并不删除 已消费的消息,为了实现传统message queue消息只被消费一次的语义,Kafka保证保证同一个consumer group里只有一个consumer会消费一条消息。与传统message queue不同的是,Kafka还允许不同consumer group同时消费同一条消息,这一特性可以为消息的多元化处理提供了支持。实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。   

四、push vs pull

  kafka采用的是传统的数据由producer推送给broker,然后由consumer从broker拉去的机制

五、基本操作
  1. 启动ZooKeeper

    1
    bin/zookeeper-server-start.sh config/zookeeper.properties
  2. 启动Kafka

    1
    bin/kafka-server-start.sh config/server.properties
  3. 创建主题

    1
    2
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 
    --partitions 1 --topic Hello-World

    刚刚创建了一个名为 `Hello-World的主题,其中包含一个分区和一个副本因子

  4. 获取主题列表

    1
    bin/kafka-topics.sh --list --zookeeper localhost:2181
  5. 生产者发送消息

    1
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-World

    然后就可以在终端键入消息

  6. 消费者消费消息.

    1
    2
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-World
    --from-beginning

kafka入门介绍

一、消息系统介绍

​ 消息系统负责将数据从一个应用程序传输到另一个应用程序,因此应用程序可以专注于数据,但不担心如何共享它。 分布式消息传递基于可靠消息队列的概念。 消息在客户端应用程序和消息传递系统之间异步排队。 有两种类型的消息模式可用 - 一种是点对点,另一种是发布 - 订阅(pub-sub)消息系统。 大多数消息模式遵循 pub-sub

点对点模式

​ 生产者发送一条消息到queue,一个queue可以有很多消费者,但是一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有 一个可用的消费者,所以Queue实现了一个可靠的负载均衡。


点对点

发布订阅模式

​ 在发布 - 订阅系统中,消息被保留在主题中。 与点对点系统不同,消费者可以订阅一个或多个主题并使用该主题中的所有消息。 在发布 - 订阅系统中,消息生产者称为发布者,消息使用者称为订阅者。


发布订阅

二、Kafka
2.1 介绍

Kafka专为分布式高吞吐量系统而设计。 Kafka往往工作得很好,作为一个更传统的消息代理的替代品。 与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和固有的容错能力,这使得它非常适合大规模消息处理应用程序。是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。

2.2 Kafka的优势

以下是Kafka的几个好处 -

  • 可靠性 - Kafka是分布式,分区,复制和容错的。
  • 可扩展性 - Kafka消息传递系统轻松缩放,无需停机。
  • 耐用性 - Kafka使用分布式提交日志,这意味着消息会尽可能快地保留在磁盘上,因此它是持久的。
  • 性能 - Kafka对于发布和订阅消息都具有高吞吐量。 即使存储了许多TB的消息,它也保持稳定的性能。

Kafka非常快,并保证零停机和零数据丢失。

Kafka可以在许多用例中使用。 其中一些列出如下 -

  • 指标 - Kafka通常用于操作监控数据。 这涉及聚合来自分布式应用程序的统计信息,以产生操作数据的集中馈送。
  • 日志聚合解决方案 - Kafka可用于跨组织从多个服务收集日志,并使它们以标准格式提供给多个服务器。
  • 流处理 - 流行的框架(如Storm和Spark Streaming)从主题中读取数据,对其进行处理,并将处理后的数据写入新主题,供用户和应用程序使用。 Kafka的强耐久性在流处理的上下文中也非常有用。
三、几大MQ比较


几大MQ对比

四、Kafka基础
4.1 几大术语
  • producer和consumer:消息的发送者叫 Producer,消息的使用者和接受者是 Consumer,生产者将数据保存到 Kafka 集群中,消费者从中获取消息进行业务的处理。

  • broker:Kafka 集群中有很多台 Server,其中每一台 Server 都可以存储消息,将每一台 Server 称为一个 kafka 实例,也叫做 broker。

  • topic:一个 topic 里保存的是同一类消息,相当于对消息的分类,每个 producer 将消息发送到 kafka 中,都需要指明要存的 topic 是哪个,也就是指明这个消息属于哪一类。

  • partition:每个 topic 都可以分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部。为什么要进行分区呢?最根本的原因就是:kafka基于文件进行存储,当文件内容大到一定程度时,很容易达到单个磁盘的上限,因此,采用分区的办法,一个分区对应一个文件,这样就可以将数据分别存储到不同的server上去,另外这样做也可以负载均衡,容纳更多的消费者。

  • Offset:一个分区对应一个磁盘上的文件,而消息在文件中的位置就称为 offset(偏移量),offset 为一个 long 型数字,它可以唯一标记一条消息。由于kafka 并没有提供其他额外的索引机制来存储 offset,文件只能顺序的读写,所以在kafka中几乎不允许对消息进行“随机读写”。

五、Kafka的安装

​ 首先要安装JDK(这个就不说了~)

​ Apache Kafka 的一个关键依赖是 Apache Zookeeper,它是一个分布式配置和同步服务。Zookeeper 是 Kafka 代理和消费者之间的协调接口。Kafka 服务器通过 Zookeeper 集群共享信息。Kafka 在 Zookeeper 中存储基本元数据,例如关于主题,代理,消费者偏移(队列读取器)等的信息。

​ 因此首先需要安装Zookeeper

5.1 Zookeeper安装

安装链接为:http://zookeeper.apache.org/releases.html

下载的是tar.gz

1
2
3
$ tar -zxf zookeeper-3.4.6.tar.gz
$ cd zookeeper-3.4.6
$ mkdir data

创建配置文件

1
2
3
4
5
6
$ vim conf/zoo.cfg
tickTime=2000
dataDir=/path/to/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2

启动Zookeeper服务器

1
$ bin/zkServer.sh start
5.2 Kafka安装

下载链接:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz

解压

1
2
3
$ cd opt/
$ tar -zxf kafka_2.11.0.9.0.0 tar.gz
$ cd kafka_2.11.0.9.0.0

启动Kafka

1
$ bin/kafka-server-start.sh config/server.properties

下一节我们将讨论Kafka的架构和基本操作

Redis cluster集群模式介绍

一、Redis主从概念

​ 为了避免单点故障,通常的做法是将数据库复制多个副本以部署在不同的服务器上,这样即使有一台服务器出现故障,其他服务器依然可以继续提供服务。为此, Redis 提供了复制(replication)功能,可以实现当一台数据库中的数据更新后,自动将更新的数据同步到其他数据库上。

​ 在复制的概念中,数据库分为两类,一类是主数据库(master),另一类是从数据库(slave)。主数据库可以进行读写操作,当写操作导致数据变化时会自动将数据同步给从数据库。而从数据库一般是只读的,并接受主数据库同步过来的数据。一个主数据库可以拥有多个从数据库,而一个从数据库只能拥有一个主数据库。

二、为什么要有集群?

​ redis的哨兵模式基本已经可以实现高可用,读写分离 ,但是在这种模式下每台redis服务器都存储相同的数据,很浪费内存,所以在redis3.0上加入了cluster模式,实现的redis的分布式存储,也就是说每台redis节点上存储不同的内容。

三、数据分布算法
3.1 hash算法

​ 哈希算法的思想非常简单,也许你知道 HashMap 的哈希函数,哈希算法跟 HashMap 一样,也是通过一个哈希函数得到某一个数字,然后根据数字找到相应的服务器。

​ 比如有N个redis实例,那么如何将key映射到redis上呢?

​ 通常是采用hash算法计算key的hash值,然后通过取模,均匀的映射到N个redis服务器上,如果增加一个服务器,那么映射公式就变成了hash(key)%(N+1),如果有一个服务器宕机了,映射公式变成了hash(key)%(N-1)

​ 这种情况下,几乎所有的缓存都失效了,会导致数据库访问的压力陡增,甚至导致数据库宕机

3.2 一致性hash算法

​ 一致性哈希算法可以说是哈希算法的升级版,解决了哈希算法扩展性差的问题,一致性哈希算法跟哈希算法不一样,一致性哈希算法会将服务器和数据都通过哈希函数映射到一个首尾相连的哈希环上

具体步骤如下:

  1. 首先求出服务器(节点)的哈希值,并将其配置到0~232的圆(continuum)上。
  2. 然后采用同样的方法求出存储数据的键的哈希值,并映射到相同的圆上。
  3. 然后从数据映射到的位置开始顺时针查找,将数据保存到找到的第一个服务器上。如果超过232仍然找不到服务器,就会保存到第一台服务器上。

hash1

hash2

可能存在的问题:

​ 一个master宕机不会导致大部分缓存失效,可能存在缓存热点问题 ,比如某个区间的值特别多没那么会导致大量的数据都涌入一个服务器内,造成缓存热点问题,出现性能瓶颈

hash3

3.3 用虚拟节点改进的一致性hash算法

​ 缓存热点问题是在服务器节点数非常少的时候容易出现,因此,我们通过添加虚拟节点的方式,来使的服务器节点数变多,从而大量数据均匀分布,而不是涌入到一个服务器内

hash4

3.4 rediscluster的hash slot算法

redis cluster中每个master都会持有部分slot,比如有3个master,那么可能每个master持有5000多个hash slot 。hash slot让node的增加和移除很简单,增加一个master,就将其他master的hash slot移动部分过去,减少一个master,就将它的hash slot移动到其他master上去

客户端的api,可以对指定的数据,让他们走同一个hash slot,通过hash tag来实现

客户端向节点发送键命令,节点要计算这个键属于哪个槽。如果是自己负责这个槽,那么直接执行命令,如果不是,向客户端返回一个MOVED错误,指引客户端转向正确的节点。

四、节点间的通信机制

1.基础通信

(1)redis cluster节点间采取gossip协议进行通信

跟集中式不同,不是将集群元数据(节点信息,故障,等等)集中存储在某个节点上,而是互相之间不断通信,保持整个集群所有节点的数据是完整的

集中式:好处在于,元数据的更新和读取,时效性非常好,一旦元数据出现了变更,立即就更新到集中式的存储中,其他节点读取的时候立即就可以感知到; 不好在于,所有的元数据的跟新压力全部集中在一个地方,可能会导致元数据的存储有压力

gossip:好处在于,元数据的更新比较分散,不是集中在一个地方,更新请求会陆陆续续,打到所有节点上去更新,有一定的延时,降低了压力; 缺点,元数据更新有延时,可能导致集群的一些操作会有一些滞后

(2)10000端口

每个节点都有一个专门用于节点间通信的端口,就是自己提供服务的端口号+10000,比如7001,那么用于节点间通信的就是17001端口

每隔节点每隔一段时间都会往另外几个节点发送ping消息,同时其他几点接收到ping之后返回pong

(3)交换的信息

故障信息,节点的增加和移除,hash slot信息,等等

2. gossip协议

  1. meet: 某个节点发送meet给新加入的节点,让新节点加入集群中,然后新节点就会开始与其他节点进行通信

  2. ping: 每个节点都会频繁给其他节点发送ping,其中包含自己的状态还有自己维护的集群元数据,互相通过ping交换元数据 每个节点每秒都会频繁发送ping给其他的集群,ping,频繁的互相之间交换数据,互相进行元数据的更新

  3. pong: 返回ping和meet,包含自己的状态和其他信息,也可以用于信息广播和更新

  4. fail: 某个节点判断另一个节点fail之后,就发送fail给其他节点,通知其他节点,指定的节点宕机了

五、redis cluster 的高可用与主备切换原理

redis cluster 的高可用的原理,几乎跟哨兵是类似的。

5.1 判断节点宕机

如果一个节点认为另外一个节点宕机,那么就是 pfail主观宕机。如果多个节点都认为另外一个节点宕机了,那么就是 fail客观宕机,跟哨兵的原理几乎一样,sdownodown

cluster-node-timeout 内,某个节点一直没有返回 pong,那么就被认为 pfail

如果一个节点认为某个节点 pfail 了,那么会在 gossip ping 消息中,ping 给其他节点,如果超过半数的节点都认为 pfail 了,那么就会变成 fail

5.2 从节点过滤

对宕机的 master node,从其所有的 slave node 中选择一个切换成 master node

检查每个 slave node 与 master node 断开连接的时间,如果超过了 cluster-node-timeout * cluster-slave-validity-factor,那么就没有资格切换成 master

5.3 从节点选举

每个从节点,都根据自己对 master 复制数据的 offset,来设置一个选举时间,offset 越大(复制数据越多)的从节点,选举时间越靠前,优先进行选举。

所有的 master node 开始 slave 选举投票,给要进行选举的 slave 进行投票,如果大部分 master node(N/2 + 1)都投票给了某个从节点,那么选举通过,那个从节点可以切换成 master。

从节点执行主备切换,从节点切换为主节点。

5.4 与哨兵比较

整个流程跟哨兵相比,非常类似,所以说,redis cluster 功能强大,直接集成了 replication 和 sentinel 的功能。

参考

部分文字参考链接:https://www.nowcoder.com/discuss/364714?type=post&order=time&pos=&page=0

图片为作者所绘制,不存在参考

TopK问题用快排和堆排的复杂度分别是多少?

  1. 关于TopK问题

    TopK问题就是在一堆数据里面找到前 K 大(当然也可以是前 K 小)的数

  2. 常规方法,完全排序

    先完全排序后取topK,这种方法需要将数据完全排序,不适用于大数据量

  3. 利用快排
    3.1 解决思路

    借鉴快排的思想,在patiton中数组会分为三个部分,我们只要与index相比较就可以得出TopK是在左边部分还是右边部分,因此不需要全部排序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    public class Solution {
    //要求的第k个数
    int k;
    List<Integer> res = new ArrayList<>();
    public List<Integer> GetLeastNumbers(int [] input, int k) {
    if (input.length < k || k < 1) {
    return res;
    }
    this.k = k;
    quick_sort(input, 0, input.length - 1);
    return res;
    }

    private void quick_sort(int[] arr, int l, int r) {
    if(l > r) {
    return ;
    }
    int index = patiton(arr, l, r);
    //此处判断就是为了变成局部排序
    if(index == k - 1) {
    for (int i = 0; i <= index; i++) {
    res.add(arr[i]);
    }
    return;
    }else if(index >= k) {
    quick_sort(arr, l ,index - 1);
    }else {
    quick_sort(arr, index + 1, r);
    }
    }

    private int patiton(int[] arr, int l, int r) {
    int tmp=arr[l];
    while(l<r){
    while(l<r&&arr[r]>=tmp){
    r--;
    }
    arr[l]=arr[r];
    while (l<r&&arr[l]<=tmp){
    l++;
    }
    arr[r]=arr[l];
    }
    arr[l]=tmp;
    return l;
    }
    }
    3.2 复杂度分析

    ​ 与快排完全排序不同,每次分割后的数组大小近似为原数组大小的一半,因此这种方法的时间复杂度实际上是O(N)+O(N/2)+O(N/4)+……<O(2N),因此时间复杂度为**O(N)**,但是这种方法需要提前将N个数读入,对于海量数据来说,对空间开销很大

  4. 堆排序法
    4.1 解决思路

    ​ 先随机取出N个数中的K个数,将这N个数构造为小顶堆,那么堆顶的数肯定就是这K个数中最小的数了,然后再将剩下的N-K个数与堆顶进行比较,如果大于堆顶,那么说明该数有机会成为TopK,就更新堆顶为该数,此时由于小顶堆的性质可能被破坏,就还需要调整堆

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class Solution {
    ArrayList<Integer> res = new ArrayList<>();
    public ArrayList<Integer> GetLeastNumbers_Solution(int [] input, int k) {
    if (input.length < k || k < 1) {
    return res;
    }
    ArrayList<Integer> res = new ArrayList<>();
    PriorityQueue<Integer> queue = new PriorityQueue<>(new Comparator<Integer>() {
    public int compare(Integer a, Integer b) {
    return b - a;
    }
    });
    int len = input.length;
    if(len < k || k < 1) {
    return res;
    }
    for(int i = 0; i < len; i++) {
    if(queue.size() < k) {
    queue.add(input[i]);
    }else {
    if(queue.peek() > input[i]) {
    queue.poll();
    queue.add(input[i]);
    }

    }
    }
    return new ArrayList<>(queue);
    }
    }
    4.2 复杂度分析

    ​ 首先需要对K个元素进行建堆,时间复杂度为O(k),然后要遍历数组,最坏的情况是,每个元素都与堆顶比较并排序,需要堆化n次 所以是O(nlog(k)),因此总复杂度是O(k+nlog(k));

    5.小结

    ​ 快排和堆排解决topK问题的复杂度其实面试中有被问过,有不少面试者直接答的快排和堆排的复杂度,但其实是不一样的,在TopK问题中,快排和堆排的复杂度分别问O(n)和O(k+nlog(k))

    ​ 通过对比可以发现,堆排的优势是只需读入K个数据即可,可以实现来一个数据更新一次,能够很好的实现数据动态读入并找出TopK。