MySQL Online DDL增量DML记录和回放的源码实现
温正湖2020-03-25 13:42:17

中分析并验证了MySQL进行在线创建索引时,不会因为执行时间过长或业务压力较大,在回放增量DML时加锁时间过久而对业务造成严重影响,本文从MySQL 8.0.19源码出发,分析MySQL是如何实现的。同时也确认是否在回放DML时会报duplicate key。

核心处理流程和对象

增量DML处理流程主要在http://row0log.cc中。

/** @file row/row0log.cc
Modification log for online index creation and online table rebuild

Created 2011-05-26 Marko Makela

通过阅读本文件代码可以发现,创建二级索引的增量DML记录和回放流程跟其他类型DDL是分开的。因为二级索引创建不需要重建表。

创建二级索引的处理流程

缓存增量DML操作

row_log_online_op用于处理创建二级索引的增量DML:

/** Logs an operation to a secondary index that is (or was) being created. */
void row_log_online_op(
dict_index_t *index, /*!< in/out: index, S or X latched */
const dtuple_t *tuple, /*!< in: index tuple */
trx_id_t trx_id) /*!< in: transaction ID for insert,
or for delete */
{

该函数直接调用os_file_write_int_fd将日志写入到临时文件中: 

err = os_file_write_int_fd(request, "(modification log)", log->fd,
log->tail.block, byte_offset, srv_sort_buf_size);

该场景下,写入临时文件的内容为正在创建的二级索引记录,无需写入聚集/主键索引记录。这样可以大大减少临时文件的数据写入量,二级索引记录构造函数如下: 

rec_convert_dtuple_to_temp(b + extra_size, index, tuple->fields,
tuple->n_fields, NULL);

回放增量DML操作

创建二级索引的增量DML回放入口为row_log_apply:

/** Apply the row log to the index upon completing index creation.
@param[in] trx transaction (for checking if the operation was
interrupted)
@param[in,out] index secondary index
@param[in,out] table MySQL table (for reporting duplicates)
@param[in,out] stage performance schema accounting object, used by
ALTER TABLE. stage->begin_phase_log_index() will be called initially and then
stage->inc() will be called for each block of log that is applied.
@return DB_SUCCESS, or error code on failure */
dberr_t row_log_apply(const trx_t *trx, dict_index_t *index,
struct TABLE *table, ut_stage_alter_t *stage) {

...
rw_lock_x_lock(dict_index_get_lock(index));

if (!index->table->is_corrupted()) {
error = row_log_apply_ops(trx, index, &dup, stage);
} else {
error = DB_SUCCESS;
}
...
} else {
ut_ad(dup.n_dup == );
dict_index_set_online_status(index, ONLINE_INDEX_COMPLETE);
}

log = index->online_log;
index->online_log = NULL;
rw_lock_x_unlock(dict_index_get_lock(index));

从这段代码可以发现调用row_log_apply_ops实际执行增量DLM回放前加了对应二级索引的互斥锁。回放完成,将索引状态设置为ONLINE_INDEX_COMPLETE,最后进行解锁。

我们在测试过程已经发现,增量回放过程是不会加长时间锁的,这跟代码实现似乎有冲突。我们在后面分析row_log_apply_ops小结揭晓。

表重建场景的处理流程

该场景不是本文分析重点,在此简单说明下。

对于需重建表的DDL场景,DML操作处理函数分别为row_log_table_apply_insert、row_log_table_apply_update和row_log_table_apply_delete,在函数的注释上进一步注明了用于回放对应DML操作的函数:

/** Logs an insert to a table that is being rebuilt.
This will be merged in row_log_table_apply_insert(). */
void row_log_table_insert(
const rec_t *rec, /*!< in: clustered index leaf page record,
page X-latched */
const dtuple_t *ventry, /*!< in: dtuple holding virtual column info */
dict_index_t *index, /*!< in/out: clustered index, S-latched
or X-latched */
const ulint *offsets) /*!< in: rec_get_offsets(rec,index) */
{
row_log_table_low(rec, ventry, NULL, index, offsets, true, NULL);
}

/** Logs an update to a table that is being rebuilt.
This will be merged in row_log_table_apply_update(). */
void row_log_table_update(
const rec_t *rec, /*!< in: clustered index leaf page record,
page X-latched */
dict_index_t *index, /*!< in/out: clustered index, S-latched
or X-latched */
const ulint *offsets, /*!< in: rec_get_offsets(rec,index) */
const dtuple_t *old_pk, /*!< in: row_log_table_get_pk()
before the update */
const dtuple_t *new_v_row, /*!< in: dtuple contains the new virtual
columns */
const dtuple_t *old_v_row) /*!< in: dtuple contains the old virtual
columns */
{
row_log_table_low(rec, new_v_row, old_v_row, index, offsets, false, old_pk);
}

/** Logs a delete operation to a table that is being rebuilt.
This will be merged in row_log_table_apply_delete(). */
void row_log_table_delete(
trx_t *trx, /*!< in: current transaction */
const rec_t *rec, /*!< in: clustered index leaf page record,
page X-latched */
const dtuple_t *ventry, /*!< in: dtuple holding virtual column info */
dict_index_t *index, /*!< in/out: clustered index, S-latched
or X-latched */
const ulint *offsets, /*!< in: rec_get_offsets(rec,index) */
const byte *sys) /*!< in: DB_TRX_ID,DB_ROLL_PTR that should
be logged, or NULL to use those in rec */

上述3个函数均是调用row_log_table_close执行增量DML日志格式化和写入操作: 

/** Stops logging an operation to a table that is being rebuilt. */
static void row_log_table_close_func(
row_log_t *log, /*!< in/out: online rebuild log */
#ifdef UNIV_DEBUG
const byte *b, /*!< in: end of log record */
#endif /* UNIV_DEBUG */
ulint size, /*!< in: size of log record */
ulint avail) /*!< in: available size for log record */
{

该函数再最后调用os_file_write_int_fd 

err = os_file_write_int_fd(request, "(modification log)", log->fd,
log->tail.block, byte_offset, srv_sort_buf_size);

总的来说,创建二级索引和重建表都需要处理增量DML,但处理方式不一样,相对来说,创建二级索引场景更加简单,因为只需要处理新增的二级索引记录即可。

增量DML记录和回放核心对象

row_log_t对象

不管是创建二级索引还是进行表重建,处理增量DML的核心对象都是row_log_t,该对象具体内容如下所示:

/** @brief Buffer for logging modifications during online index creation

All modifications to an index that is being created will be logged by
row_log_online_op() to this buffer.

All modifications to a table that is being rebuilt will be logged by
row_log_table_delete(), row_log_table_update(), row_log_table_insert()
to this buffer.

When head.blocks == tail.blocks, the reader will access tail.block
directly. When also head.bytes == tail.bytes, both counts will be
reset to 0 and the file will be truncated. */
struct row_log_t {
int fd; /*!< file descriptor */
ib_mutex_t mutex; /*!< mutex protecting error,
max_trx and tail */
page_no_map *blobs; /*!< map of page numbers of off-page columns
that have been freed during table-rebuilding
ALTER TABLE (row_log_table_*); protected by
index->lock X-latch only */
dict_table_t *table; /*!< table that is being rebuilt,
or NULL when this is a secondary
index that is being created online */
bool same_pk; /*!< whether the definition of the PRIMARY KEY
has remained the same */
const dtuple_t *add_cols;
/*!< default values of added columns, or NULL */
const ulint *col_map; /*!< mapping of old column numbers to
new ones, or NULL if !table */
dberr_t error; /*!< error that occurred during online
table rebuild */
trx_id_t max_trx; /*!< biggest observed trx_id in
row_log_online_op();
protected by mutex and index->lock S-latch,
or by index->lock X-latch only */
row_log_buf_t tail; /*!< writer context;
protected by mutex and index->lock S-latch,
or by index->lock X-latch only */
row_log_buf_t head; /*!< reader context; protected by MDL only;
modifiable by row_log_apply_ops() */
ulint n_old_col;
/*!< number of non-virtual column in
old table */
ulint n_old_vcol;
/*!< number of virtual column in old table */
const char *path; /*!< where to create temporary file during
log operation */
};

这里我们仅分析创建二级索引场景,关注的字段包括fd、tail、head和path。

fd和path分别表示缓存增量DML的文件路径和文件句柄。path所在目录为所设置的innodb_tmpdir指定,若该值为空,则设置为tmpdir对应目录。

tail和head为row_log_buf_t对象,分别用于进行增量DML缓存和回放。我们单独放在一个小结说明。

缓存增量DML的临时文件

临时文件由row_log_tmpfile创建并打开:

/** Create the file or online log if it does not exist.
@param[in,out] log online rebuild log
@return true if success, false if not */
static MY_ATTRIBUTE((warn_unused_result)) int row_log_tmpfile(row_log_t *log) {
DBUG_TRACE;
if (log->fd < ) {
log->fd = row_merge_file_create_low(log->path);
DBUG_EXECUTE_IF("row_log_tmpfile_fail",
if (log->fd > ) row_merge_file_destroy_low(log->fd);
log->fd = -1;);
if (log->fd >= ) {
MONITOR_ATOMIC_INC(MONITOR_ALTER_TABLE_LOG_FILES);
}
}

return log->fd;
}

/** Create temporary merge files in the given paramater path, and if
UNIV_PFS_IO defined, register the file descriptor with Performance Schema.
@param[in] path location for creating temporary merge files.
@return File descriptor */
int row_merge_file_create_low(const char *path) {
int fd;
if (path == NULL) {
path = innobase_mysql_tmpdir();
}
#ifdef UNIV_PFS_IO
/* This temp file open does not go through normal
file APIs, add instrumentation to register with
performance schema */
Datafile df;
df.make_filepath(path, "Innodb Merge Temp File", NO_EXT);

struct PSI_file_locker *locker = NULL;
PSI_file_locker_state state;

locker = PSI_FILE_CALL(get_thread_file_name_locker)(
&state, innodb_temp_file_key.m_value, PSI_FILE_OPEN, df.filepath(),
&locker);

if (locker != NULL) {
PSI_FILE_CALL(start_file_open_wait)(locker, __FILE__, __LINE__);
}
#endif /* UNIV_PFS_IO */
fd = innobase_mysql_tmpfile(path);
#ifdef UNIV_PFS_IO
if (locker != NULL) {
PSI_FILE_CALL(end_file_open_wait_and_bind_to_descriptor)(locker, fd);
}
#endif /* UNIV_PFS_IO */

if (fd < ) {
ib::error(ER_IB_MSG_967) << "Cannot create temporary merge file";
return (-1);
}
return (fd);
}

从中可以看出,所创建的文件名为“Innodb Merge Temp File”,可通过performance_schema.file_instances等系统表查看临时文件位置和相关统计信息。如下所示: 

node1-performance_schema>select * from file_instances  where FILE_NAME like "%%Innodb Merge Temp File%%"\G      *************************** 1. row ***************************
FILE_NAME: /tmp/Innodb Merge Temp File
EVENT_NAME: wait/io/file/innodb/innodb_temp_file
OPEN_COUNT: 2
1 row in set (0.00 sec)

node1-performance_schema>select * from file_summary_by_instance where FILE_NAME like "%%Innodb Merge Temp File%%"\G
*************************** 1. row ***************************
FILE_NAME: /tmp/Innodb Merge Temp File
EVENT_NAME: wait/io/file/innodb/innodb_temp_file
OBJECT_INSTANCE_BEGIN: 140548089243840
COUNT_STAR: 18484
SUM_TIMER_WAIT: 7393902183975
MIN_TIMER_WAIT: 76528245
AVG_TIMER_WAIT: 400015995
MAX_TIMER_WAIT: 27160453440
COUNT_READ: 9240
SUM_TIMER_READ: 2499001980465
MIN_TIMER_READ: 183015375
AVG_TIMER_READ: 270454725
MAX_TIMER_READ: 27160453440
SUM_NUMBER_OF_BYTES_READ: 9688842240
COUNT_WRITE: 9240
SUM_TIMER_WRITE: 4894539195270
MIN_TIMER_WRITE: 385078965
AVG_TIMER_WRITE: 529711680
MAX_TIMER_WRITE: 1293598650
SUM_NUMBER_OF_BYTES_WRITE: 9688842240
COUNT_MISC: 4
SUM_TIMER_MISC: 361008240
MIN_TIMER_MISC: 76528245
AVG_TIMER_MISC: 90252060
MAX_TIMER_MISC: 106280070
1 row in set (0.00 sec)

...

临时文件大小由参数innodb_online_alter_log_max_size确定。 

node1-sbtest>show variables like "%%innodb_online_alter_log_max_size%%";
+----------------------------------+-----------+
| Variable_name | Value |
+----------------------------------+-----------+
| innodb_online_alter_log_max_size | 134217728 |
+----------------------------------+-----------+
1 row in set (.01 sec)

该参数默认值为128M,可在线调整,若在执行过程中将该参数调小或设置值不够大,会导致DDL操作失败,如下例子所示: 

node1-performance_schema>show variables like "%%innodb_online_alter_log_max_size%%";
+----------------------------------+-------+
| Variable_name | Value |
+----------------------------------+-------+
| innodb_online_alter_log_max_size | 65536 |
+----------------------------------+-------+
1 row in set (0.00 sec)

node1-sbtest>alter table sbtest1 add index idx_d(wzh);
ERROR 1799 (HY000): Creating index 'idx_d' required more than 'innodb_online_alter_log_max_size' bytes of modification log. Please try again.

row_log_buf_t

row_log_buf_t是另一个重要对象,定义如下:

/** Log block for modifications during online ALTER TABLE */
struct row_log_buf_t {
byte *block; /*!< file block buffer */
ut_new_pfx_t block_pfx; /*!< opaque descriptor of "block". Set
by ut_allocator::allocate_large() and fed to
ut_allocator::deallocate_large(). */
mrec_buf_t buf; /*!< buffer for accessing a record
that spans two blocks */
ulint blocks; /*!< current position in blocks */
ulint bytes; /*!< current position within block */
ulonglong total; /*!< logical position, in bytes from
the start of the row_log_table log;
for row_log_online_op() and
row_log_apply(). */
};

根据定义,进一步结合处理流程可以知道,增量DML日志的缓存(写入临时文件)和回放(读取临时文件)时以记录块为单位进行的。一个记录块可保存一条或多条增量DML日志。一条增量DML日志可能跨2个记录块。

在row_log_buf_t对象中,block字段表示当前正在操作的最后一个未满的记录块,bytes是该记录块已使用的字节数,blocks表示已经往临时文件中写入多少个记录块。buf用于处理一条DML日志横跨2个记录块的场景。

记录块的大小由参数innodb_sort_buffer_size指定:

node1-performance_schema>show variables like "%%innodb_sort_buffer_size%%";
+-------------------------+---------+
| Variable_name | Value |
+-------------------------+---------+
| innodb_sort_buffer_size | 1048576 |
+-------------------------+---------+
1 row in set (.01 sec)

参数默认为1MB,该参数为只读参数,无法动态调整。

增量DML写入实现分析

我们首先看看row_log_online_op函数的调用场景,经查询发现大致有2处调用,分别为row_log_online_op_try和row_upd_sec_index_entry_low,如下所示:

/** Try to log an operation to a secondary index that is
(or was) being created.
@retval true if the operation was logged or can be ignored
@retval false if online index creation is not taking place */
UNIV_INLINE
bool row_log_online_op_try(
dict_index_t *index, /*!< in/out: index, S or X latched */
const dtuple_t *tuple, /*!< in: index tuple */
trx_id_t trx_id) /*!< in: transaction ID for insert,
or 0 for delete */
{
ut_ad(rw_lock_own_flagged(dict_index_get_lock(index),
RW_LOCK_FLAG_S | RW_LOCK_FLAG_X | RW_LOCK_FLAG_SX));

switch (dict_index_get_online_status(index)) {
case ONLINE_INDEX_COMPLETE:
/* This is a normal index. Do not log anything.
The caller must perform the operation on the
index tree directly. */
return (false);
case ONLINE_INDEX_CREATION:
/* The index is being created online. Log the
operation. */
row_log_online_op(index, tuple, trx_id);
break;
case ONLINE_INDEX_ABORTED:
case ONLINE_INDEX_ABORTED_DROPPED:
/* The index was created online, but the operation was
aborted. Do not log the operation and tell the caller
to skip the operation. */
break;
}

return (true);
}

/** Updates a secondary index entry of a row.
@param[in] node row update node
@param[in] old_entry the old entry to search, or nullptr then it
has to be created in this function
@param[in] thr query thread
@return DB_SUCCESS if operation successfully completed, else error
code or DB_LOCK_WAIT */
static MY_ATTRIBUTE((warn_unused_result)) dberr_t
row_upd_sec_index_entry_low(upd_node_t *node, dtuple_t *old_entry,
que_thr_t *thr) {
...
mtr_s_lock(dict_index_get_lock(index), &mtr);

switch (dict_index_get_online_status(index)) {
case ONLINE_INDEX_COMPLETE:
/* This is a normal index. Do not log anything.
Perform the update on the index tree directly. */
break;
case ONLINE_INDEX_CREATION:
/* Log a DELETE and optionally INSERT. */
row_log_online_op(index, entry, );

if (!node->is_delete) {
mem_heap_empty(heap);
entry =
row_build_index_entry(node->upd_row, node->upd_ext, index, heap);
ut_a(entry);
row_log_online_op(index, entry, trx->id);
}
/* fall through */
...

row_upd_sec_index_entry_low为对二级索引的更新场景。进一步溯源可以发现,row_log_online_op_try由二级索引的插入和删除等场景的处理函数调用。这是可以理解的,不深入分析。这样展示的是,不管那个路径进来,都是持有二级索引的锁的。这也可以理解,但似乎跟回放DML日志的流程有锁冲突。问题先抛出来,后面再分析。

从上面还可以看出,对于一个DML操作,会先写一条DELETE日志(row_log_online_op第三参数为0),如果该DML不是删除操作,那么再写一条INSERT操作。也就是说,处理DML时,删除操作仍保持为删除,插入和更新均改写为先删除再插入的形式。

(2020-3-23:这样的处理方式,不应该会导致duplicate key才对,欢迎讨论)

(2020-3-23 12:32:55,又分析了下代码,发现唯一索引还是会有问题的,如下所示:

/* Ensure that we acquire index->lock when inserting into an
index with index->online_status == ONLINE_INDEX_COMPLETE, but
could still be subject to rollback_inplace_alter_table().
This prevents a concurrent change of index->online_status.
The memory object cannot be freed as long as we have an open
reference to the table, or index->table->n_ref_count > 0. */
bool check = !index->is_committed();

DBUG_EXECUTE_IF("idx_mimic_not_committed", {
check = true;
mode = BTR_MODIFY_TREE;
});

if (check) {
DEBUG_SYNC_C("row_ins_sec_index_enter");
if (mode == BTR_MODIFY_LEAF) {
search_mode |= BTR_ALREADY_S_LATCHED;
mtr_s_lock(dict_index_get_lock(index), &mtr);
} else {
mtr_sx_lock(dict_index_get_lock(index), &mtr);
}

if (row_log_online_op_try(index, entry, thr_get_trx(thr)->id)) {
goto func_exit;
}
}
...
err = row_ins_scan_sec_index_for_duplicate(flags, index, entry, thr, check,
&mtr, offsets_heap);

mtr_commit(&mtr);

switch (err) {
case DB_SUCCESS:
break;
case DB_DUPLICATE_KEY:
if (!index->is_committed()) {
ut_ad(!thr_get_trx(thr)->dict_operation_lock_mode);

dict_set_corrupted(index);
/* Do not return any error to the
caller. The duplicate will be reported
by ALTER TABLE or CREATE UNIQUE INDEX.
Unfortunately we cannot report the
duplicate key value to the DDL thread,
because the altered_table object is
private to its call stack. */
err = DB_SUCCESS;
}
/* fall through */

先插入增量DML日志再进行唯一性约束检查,虽然err被置为DB_SUCCESS,但index被标记为corrupted,所以会导致索引的操作出错

在row_log_online_op中,由如下代码段判断增量DML是否超过了设置的innodb_online_alter_log_max_size(srv_online_max_size):

const os_offset_t byte_offset =
(os_offset_t)log->tail.blocks * srv_sort_buf_size;

if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
goto write_failed;
}


 这里引申出一个问题,在上一篇进行验证时,在sysbench oltp tps超过3k负载下,创建二级索引操作执行了约30分钟,默认最大为128M的增量日志文件竟然没有超出,这说明记录的日志量是比较有限的。

我们接着看row_log_online_op函数实现:

avail_size = srv_sort_buf_size - log->tail.bytes;

if (mrec_size > avail_size) {
b = log->tail.buf;
} else {
b = log->tail.block + log->tail.bytes;
}
...

if (mrec_size >= avail_size) {
dberr_t err;
IORequest request(IORequest::WRITE);
const os_offset_t byte_offset =
(os_offset_t)log->tail.blocks * srv_sort_buf_size;

if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
goto write_failed;
}

if (mrec_size == avail_size) {
ut_ad(b == &log->tail.block[srv_sort_buf_size]);
} else {
ut_ad(b == log->tail.buf + mrec_size);
memcpy(log->tail.block + log->tail.bytes, log->tail.buf, avail_size);
}

UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size);

if (row_log_tmpfile(log) < ) {
log->error = DB_OUT_OF_MEMORY;
goto err_exit;
}

err = os_file_write_int_fd(request, "(modification log)", log->fd,
log->tail.block, byte_offset, srv_sort_buf_size);

log->tail.blocks++;
if (err != DB_SUCCESS) {
write_failed:
/* We set the flag directly instead of
invoking dict_set_corrupted() here,
because the index is not "public" yet. */
index->type |= DICT_CORRUPT;
}
UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
memcpy(log->tail.block, log->tail.buf + avail_size, mrec_size - avail_size);
log->tail.bytes = mrec_size - avail_size;
} else {
log->tail.bytes += mrec_size;
ut_ad(b == log->tail.block + log->tail.bytes);
}


 当等待缓存的增量DML日志量mrec_size大于等于当前记录块的可用空间avail_size时,会触发将记录块写入临时文件的操作。如果mrec_size等于avail_size,那么直接写入当前记录块。

如果mrec_size大于avail_size,那么会将当前的DML日志先写入tail.buf字段,并拷贝DML日志前面部分到当前记录块,将其填满。再调用os_file_write_int_fd将记录块写入临时文件。

完成当前记录块写入临时文件后,把DML日志的剩余部分拷贝到已经空闲的tail.block上。

从这里我们可以确认,DML日志不会全部缓存在内存中,而是会写入到临时文件中,内存中仅保留最后一个记录块。因此不存在执行时间过长引起内存空间占用过多的问题。相对来说,临时文件磁盘空间消耗,问题会小很多,而且上面也提到,对于创建二级索引的DDL场景,产生的增量日志量还是远远少于拷贝表中全量数据这种实现方式。

增量DML回放实现分析

前面提到row_log_apply函数为日志回放的入口,而且是加了二级索引的锁的。似乎会导致回放期间DML操作阻塞,接下来就看看源码是如何处理的。

分析由row_log_apply_ops负责的具体回放操作。在该函数中,跟网上大佬分析MySQL 5.6在线加索引的实现一样的,虽然进入该函数时加了index锁,但在处理非最后一个block时,会释放锁,然后读取文件上的对应日志块并进行回放:

ut_ad(has_index_lock);
has_index_lock = false;
rw_lock_x_unlock(dict_index_get_lock(index));

log_free_check();

if (!row_log_block_allocate(index->online_log->head)) {
error = DB_OUT_OF_MEMORY;
goto func_exit;
}

IORequest request;
dberr_t err = os_file_read_no_error_handling_int_fd(
request, index->online_log->path, index->online_log->fd,
index->online_log->head.block, ofs, srv_sort_buf_size, NULL);

...

while (!trx_is_interrupted(trx)) {
mrec = next_mrec;
ut_ad(mrec < mrec_end);

if (!has_index_lock) {
/* We are applying operations from a different
block than the one that is being written to.
We do not hold index->lock in order to
allow other threads to concurrently buffer
modifications. */
ut_ad(mrec >= index->online_log->head.block);
ut_ad(mrec_end == index->online_log->head.block + srv_sort_buf_size);
ut_ad(index->online_log->head.bytes < srv_sort_buf_size);

/* Take the opportunity to do a redo log
checkpoint if needed. */
log_free_check();
} else {
/* We are applying operations from the last block.
Do not allow other threads to buffer anything,
so that we can finally catch up and synchronize. */
ut_ad(index->online_log->head.blocks == );
ut_ad(index->online_log->tail.blocks == );
ut_ad(mrec_end ==
index->online_log->tail.block + index->online_log->tail.bytes);
ut_ad(mrec >= index->online_log->tail.block);
}

next_mrec = row_log_apply_op(index, dup, &error, offsets_heap, heap,
has_index_lock, mrec, mrec_end, offsets);

在回放场景使用的是row_log_t对象的head子对象,block字段缓存从临时文件中读去的日志块,调用row_log_apply_op回放DML日志,row_log_apply_op会返回下一个DML日志的位置,因此通过while循环记录完成整个block回放。

完成该block上每条DML日志回放后,会重新加上二级索引的互斥锁,然后修改进度参数:head的blocks字段增一并将偏移量置位。通过next_block标志跳转来继续处理下一个block,如下所示:

process_next_block:
rw_lock_x_lock(dict_index_get_lock(index));
has_index_lock = true;

index->online_log->head.bytes = ;
index->online_log->head.blocks++;
goto next_block;

调整后先验证状态是否合法。接着会判断接下来处理的记录块是否为最后一个block,如果是(判断标准就是head和tail的blocks字段相同),那么已经持有的二级索引互斥锁会继续保持。

处理最后一个block时不需要从日志文件中读取block,因为最后一个block还缓存在内存中。因此,在开始处理前会先将用于缓存增量DML日志的临时文件truncate掉,避免无意义的存储资源消耗。完成所有DML日志处理后,会将返回值设置为DB_SUCCESS,并跳转到func_exit标识的代码段,进行退出row_log_apply_op前的最后处理。

next_block:
ut_ad(has_index_lock);
ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_X));
ut_ad(index->online_log->head.bytes == );

stage->inc(row_log_progress_inc_per_block());

if (trx_is_interrupted(trx)) {
goto interrupted;
}

...

if (index->online_log->head.blocks == index->online_log->tail.blocks) {
if (index->online_log->head.blocks) {
#ifdef HAVE_FTRUNCATE
/* Truncate the file in order to save space. */
if (index->online_log->fd > 0 &&
ftruncate(index->online_log->fd, 0) == -1) {
perror("ftruncate");
}
#endif /* HAVE_FTRUNCATE */
index->online_log->head.blocks = index->online_log->tail.blocks = 0;
}

next_mrec = index->online_log->tail.block;
next_mrec_end = next_mrec + index->online_log->tail.bytes;

if (next_mrec_end == next_mrec) {
/* End of log reached. */
all_done:
ut_ad(has_index_lock);
ut_ad(index->online_log->head.blocks == 0);
ut_ad(index->online_log->tail.blocks == 0);
error = DB_SUCCESS;
goto func_exit;
}
} else {


 从前面几段代码可以发现,创建二级索引时会通过trx_is_interrupted判断创建操作是否被中断,也就是说可以通过kill等方式终止创建操作。


总结

本文分析了MySQL 8.0上Online DDL功能中创建二级索引场景的增量DML处理流程,从源码层面确认了加锁时间并不是跟增量DML的数量正相关,应该说MySQL该环节的处理是比较。Online DDL是个很大的功能集,后续将通过其他文章分析索引创建的全量索引记录构造阶段。

 

0
0
写文章
戳我,来吐槽~