上一篇 MySQL MGR成员管理与故障恢复实现 我们从方案层面讲解了MGR的成员管理和故障恢复。本篇从源码层面捋一捋,本篇内容可以分为2个部分,部分对应的是start group_replication命令执行过程,第二部分对应命令返回后节点状态从RECOVERING(恢复中)到ONLINE(在线)的过程。
start group_replication处理逻辑
该命令用于将一个正在运行且已经install了group_replication.so插件的mysqld加入到group中。又可以根据是否设置了group_replication_bootstrap_group来分为创建一个新的group或加入一个已有的group。start group_replication对应的实现函数为plugin_group_replication_start,其做些基本的检查后交由initialize_plugin_and_join()来执行节点加入group的任务,下图为该函数的主要执行流程。
下面我们注意对其9个阶段进行逐一分析:
1、调用Gcs_operations::initialize()来创建gcs_interface对象并设置gcs日志系统;
2、根据用户配置的MGR参数设置GCS,其中重要的逻辑是调用Gcs_xcom_interface::initialize_xcom()来进行MGR目前执行的gcs实现xcom的初始化,该函数详见后续小节描述。
3、设置本节点的MGR成员管理对象,包括本节点信息维护对象local_member_info和集群信息维护对象group_member_mgr,此时group_member_mgr还只有本地节点的信息。
4、initialize_recovery_module()函数初始化节点的MGR recovery模块,包括创建Recovery_module对象,为其设置节点上线策略(认证完上线还是回放完上线),设置全局故障恢复节点donor节点连接失败重试的次数和失败重连的时间间隔。Recovery_module模块在start group_replication命令返回后,负责将节点从恢复中变为在线状态。
5、接下来调用configure_and_start_applier_module()函数配置和启动节点的MGR Applier_module模块,该模块是MGR的核心模块之一,用于对通过xcom进入该节点的包括用户事务数据包(package)在内的众多行为(action)/事件(event)进行分发、认证和执行。
首先创建一个Applier_module对象,并将其赋予Recovery_module模块;接着对Applier_module对象进行初始化,包括创建用于缓存等待处理的消息的队列incoming,初始化模块的处理机制pipeline;后启动一个新的线程,并制定处理函数applier_thread_handle(),用于具体处理这些入队的消息。MGR使用pipeline管道的方式处理每个消息,pipeline设置和初始化详见后续小节分析。
6、在单主模式下,调用initialize_asynchronous_channels_observer()进行节点异步复制通道的操作行为监听。主要监听行为是io thread和sql thread启动操作。原因是在单主模式下,只有primary节点能接收外部写请求,所以需要禁止MGR单主模式的secondary节点启动非MGR的异步复制和回放Relay log的通道。而多主模式下各节点均可写,不存在该限制。
7、下一步initialize_group_partition_handler()用于初始化MGR的网络分区处理对象group_partition_handler。应对MGR中的节点因为网络问题导致相互间无法进行正常的通信。当有节点无法连上其他节点时,就会启动handler->partition_thread_handler()网络分区处理线程,若在超时时间内()网络无法恢复,则会进行网络分区处理,包括退出group并rollback已进入MGR的事务等。
8、调用start_group_communication()启动节点与group中其他节点的通信是及其重要的步骤。直接决定了该节点是否能够正常加入group。
该函数首先设置节点的自增列的自增区间和自增初始值。然后初始化group集群视图变化状态设置监听对象view_change_notifier,该notifier会设置和监控视图变化情况并作出相应处理,初始化非常重要的events_handler,该handler是向gcs注册的事件回调接口,用于处理gcs(paxos)返回的各种消息/事件,包括用户事务数据包、group变化的控制包等。接着,view_change_notifier设置view_changing变量,表示正式进入视图切换阶段(当view_changing重新变为false时,start group_replication即可返回)。后调用Gcs_operations::join()函数执行将该节点加入group的操作,该函数详见后续小节描述。
9、以上就是start group_replication命令所要执行的所有操作,在完成这些操作后,initialize_plugin_and_join()调用view_change_notifier->wait_for_view_modification()来等待view_changing变为false,之命令返回。若返回值非零,则意味着加入失败,需要进行操作回滚。若返回值为零,则节点进入故障恢复流程,终将节点设置为在线状态。
Gcs_xcom_interface::initialize_xcom()
initialize_xcom()用于初始化节点的xcom实例。但此时该实例仍未运行和加入的group。
如上图所示,该函数主要是初始化xcom实例的group信息及相关对象,设置将该xcom加入group所需的种子节点,注册各种xcom消息的回调处理函数,创建xcom的各种操作接口xcom_proxy,后创建Gcs_xcom_engine对象,并调用Gcs_xcom_engine::initialize()来启动节点的xcom处理引擎,xcom引擎通过独立线程的处理函数process_notification_thread来调用2.2.1.3.3设置的回调函数处理各种xcom消息。process_notification_thread的处理逻辑如下所示:
即不断判断m_notification_queue是否有消息,若有则将其取出,调用该消息对应的回调函数进行处理。
那么这些消息如何加入m_notification_queue队列呢,详见下图例子:
deliver_to_app是MGR底层paxos将已达成一致性的消息发送给上层的主要接口之一,其调用在该类消息处理函数xcom_receive_data()进行处理,处理操作很简单,经创建一个Data_notification对象,参数中指定了具体的消息处理函数,后将其push到我们上面所述的m_notification_queue队列。我们进一步看看do_cb_xcom_receive_data()如何处理该消息:
首先,将从paxos接收到的消息进行pipeline处理(注意跟Applier_module的pipeline相区分),目前pipeline仅可注册一个stage,即Gcs_message_stage_lz4,用于对paxos消息进行压缩。可通过参数group_replication_compression_threshold来设置进行消息压缩的阈值。
接着,若消息类型为进行视图切换时产生的成员状态交换消息(CT_INTERNAL_STATE_EXCHANGE),调用process_control_message()函数进行处理。否则,判断是否正在进行视图切换,若是,则暂时buffer住这些消息,等到视图完成切换后(即成员加入group后)再处理。否则,调用上层为每种消息注册的回调函数on_message_received()进行处理。
下面,我们简要介绍process_control_message()和on_message_received():
on_view_changed是各个节点对视图切换的响应,处理流程如下:
on_message_received()是个消息分发枢纽,其操作如下:
如上图所示,根据消息类型的不同,分别调用不同的处理函数,其中对于单主模式新primary完成relay log回放后的消息和事务性消息(事务和视图数据包)均进入到applier_module的incoming队列按顺序执行。其他三种消息分别是用于节点间进行事务执行状态周期性同步的CT_PIPELINE_STATS_MEMBER_MESSAGE消息,该消息是performance_schema.replication_group_member_stats和replication_group_members的主要数据来源。会触发节点进入流控模式。CT_RECOVERY_MESSAGE消息用于在节点完成故障恢复后将自己设置为在线状态前给group发送的广播消息,确保其他节点及时感知节点状态变化。CT_CERTIFICATION_MESSAGE是周期性发送的,用于对applier_module模块的冲突检测数据库进行无用信息purge的消息。
Applier_module的pipeline实现
MGR使用pipeline管道的方式处理每个消息,pipeline设置和初始化详见后续小节分析。目前官方设置了3个pipeline处理器(handler),分别是CATALOGING_HANDLER、CERTIFICATION_HANDLER和SQL_THREAD_APPLICATION_HANDLER,其中CATALOGING_HANDLER用于待处理的事件进行分类,主要通过判断事件类型是否为binary_log::TRANSACTION_CONTEXT_EVENT来设置事务开始的标志,并判断是否为SINGLE_VIEW_EVENT来标识处理视图变更事件。CERTIFICATION_HANDLER是核心的pipeline处理器,在各个节点采用相同的规则独立进行事务认证,包括认证模块初始化和销毁、事务快照版本解析、对事务进行冲突检测和在视图变更的时候初始化冲突检测数据库等。SQL_THREAD_APPLICATION_HANDLER用于并行回放通过了认证的异地事务,更新本地的数据库版本。3个pipeline的汇总信息如下图所示:
Gcs_operations::join()
该接口用于在start group_replication时将节点加入group。具体流程如下:
首先进行通信(gcs_communication)和控制(gcs_control)接口初始化,确保从gcs层发送上来的各种消息能够被正确执行。后调用Gcs_xcom_control::join()来执行终的节点入群操作,该函数判断是创建group还是加入group场景,并在Gcs_xcom_control::do_join()中进行对应处理,do_join在Gcs_xcom_interface::initialize_xcom()的基础上完成后的加入操作,流程如下:
先创建m_xcom_thread线程,通过proxy->xcom_init()进行后的初始化,让xcom实例进入运行状态;在确保xcom实例通信正常的情况下,分别调用proxy->xcom_client_boot()或proxy->xcom_client_add_node()创建一个group或加入已存在的group。
2个遗留问题
上面就是start group_replication命令的全部操作。经过上面介绍后,还有2个大疑问没有解释清楚:
1、该命令在什么时候返回?
上面我们只是说在view_changing为false的时候返回,那么该变量什么时候会变为false呢,首先需要明确的是在前述的Plugin_gcs_events_handler::on_view_changed()函数中设置,对于执行该命令的节点,会在Plugin_gcs_events_handler::handle_joining_members()函数中调用view_change_notifier->end_view_modification()设置,并进行广播:
2、在介绍process_control_message()时我们知道该函数是接收了CT_INTERNAL_STATE_EXCHANGE消息后调用执行的。那么该消息是在什么场景下发出的?
下面我们从源头开始进行简单介绍:
detector是paxos层的一个定时任务机制,用于发现group的成员变化情况,并进行相应的处理。对于我们所述的场景,有一个新的节点加入paxos group中,所以会触发该任务发送一个view_msg消息。终调用do_cb_xcom_receive_global_view进行处理。具体逻辑为:
该函数会获取自己节点的状态信息get_exchangeable_data(),并使用Gcs_xcom_state_exchange::broadcast_state()来广播一个CT_INTERNAL_STATE_EXCHANGE。
由于细节实在太多,无法对每个函数进行详细介绍,权当抛砖引玉,梳理下整个源码执行流程。通过上述流程,可以对MGR的整个代码框架有个初步的认识,感兴趣的同学可以一起深入讨论交流。后放一个MGR模块图作为总结: