绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
Postgres-xl GTM(全局事务管理器 Globale Transaction Manage
2022-05-06 16:19:59
由于Postgres-xl是基于Postgres-xc的,故这里使用Postgres-xc的资料对其GTM组件进行分析。GTM 是 Postgres-XC 的一个关键组件,用于提供一致的事务管理和元组可见性控制。 首先,我们将介绍 PostgreSQL 如何管理事务和数据库更新。

在 PostgreSQL 中,每个事务都被赋予的 ID,称为事务 ID(或 XID)。 XID 按升序给出,以确定哪个事务较旧/较新。 当事务尝试读取元组时,每个元组都有一组 XID 来指示创建和删除该元组的事务。 因此,如果目标元组是由活动事务创建的,则不会提交或中止,读取事务应忽略此类元组。 以这种方式(在实践中,这是由 PostgreSQL 核心中的 tqual.c 模块完成的),如果我们在整个系统中(不仅在单个服务器中而且在所有服务器中 )给每个事务一个的事务 Id 并维护快照(哪个事务是活动的),即使服务器接受来自仅在其他服务器上运行的其他事务的新语句,我们也可以保持每个元组的全局一致可见性。

这些信息存储在表的每一行的“xmin”和“xmax”字段中。 当我们插入行时,插入事务的 XID 记录在 xmin 字段中。 当我们更新表的行(使用 UPDATE 或 DELETE 语句)时,PostgreSQL 不会简单地覆盖旧的行。 相反,PostgreSQL 通过将更新事务的 XID 写入 xmax 字段来“标记”旧行为“已删除”。 在 UPDATE(就像 INSERT)的情况下,会创建新行,其 xmin 字段被“标记”为创建事务的 XID。

这些“xmin”和“xmax”用于确定哪个行对事务可见。 为此,PostgreSQL 需要一个数据来指示特定时间正在运行哪些事务。 这称为“快照”。 如果一个事务在快照中,即使它已经完成也被认为是在运行。 你应该明白,这个特定的时间不仅仅是现在。 如果事务的隔离级别是读提交read committed,则事务需要在一段时间内保持一致的可见性,至少在执行 SQL 语句时是这样。 如果 SQL 语句读取在此执行期间提交的单行,则不是可取的。 因此,在读提交隔离级别的情况下,数据库应该在语句执行前获取快照并在整个执行过程中继续使用它。 在可重复读取和可序列化的情况下,事务需要在整个事务执行过程中保持一致的可见性。在这种情况下,事务应该在语句执行之前获取快照,并且应该在整个事务执行过程中继续使用快照,而不是单个语句执行。如果创建行的事务没有运行,则每行的可见性取决于创建事务是提交还是中止。假设一个表的行是由某个事务创建的,但尚未删除。如果创建事务正在运行,则该行对创建该行的事务可见,但对其他事务不可见。如果创建事务未运行且已提交,则行可见。如果事务被中止,则此行不可见。

因此,PostgreSQL 需要两种信息来确定“哪个事务正在运行”和“是否提交或中止了旧事务”。 前者的信息可以作为“快照”获得。 PostgreSQL 将后面的信息维护为“CLOG”。PostgreSQL 使用所有这些信息来确定哪一行对给定事务可见 - 哪个事务正在运行 --> 快照 - 是否提交或中止了旧事务 --> CLOG 事务的元数据。这种日志用于告诉PostgreSQL哪个事务已经完成、哪个还没有完成



事务管理全局化

在 Postgres-XC 中,GTM 为事务管理提供了以下功能: 1. 给事务分配全局XID(全局事务ID GXID,Global Transaction ID)。 使用 GXID,可以全局识别事务。 如果一项事务写入多个节点,我们可以跟踪此类写入。 2. 提供快照。 GTM 收集所有事务的状态(running、commited、aborted 等)以提供全局快照(全局快照)。请注意,全局快照包括给其他服务器的 GXID,如图 1.8 所示。因为一些较旧的事务可能会访问新的服务器,在这种情况下,如果快照中不包含此类事务的 GXID,则该事务可能会被视为“足够老”,并且可能会读取未提交的行。如果此类事务的 GXID 一开始就包含在快照中,这种不一致不会发生。

我们需要全局快照的原因如下: 2PC协议强制每个分布式事务的更新。但是,它并不强制保持分布式事务对其他事务更新的一致可见性。根据每个节点的提交时间,更新可能对读取这些节点的同一事务可见,也可能不可见。为了保持一致的可见性,我们需要一个全局快照,其中包含postgresxc集群中所有正在运行的事务信息(在本例中为GXID、全局事务id),并在PostgreSQL中找到的读取操作的相同上下文中使用它。

为此,postgresxc引入了一个名为GTM(global transaction manager)的专用组件。GTM作为一个单独的组件运行,并为postgresxc服务器上运行的每个事务提供且有序的事务id。我们称之为GXID(全局事务Id),因为这是全局的Id, GTM从事务接收GXID请求并提供GXID。它还跟踪所有事务的开始和结束时间,以生成用于控制每个元组可见性的快照。因为这里的快照也是全局属性,所以称为全局快照。只要每个事务都使用GXID和Global Snapshot运行,它就可以在整个系统中保持一致的可见性,并且在任何服务器上并行运行事务都是安全的。

另一方面,一个由多个语句组成的事务可以使用多个服务器来执行,保持一致的更新和可见性。这一机制的概要如图1.9所示。请注意每个快照中包含的事务是如何根据全局事务进行更改的。

GTM为每个事务提供全局事务Id,并跟踪所有事务的状态,无论事务是正在运行、已提交还是已中止,以计算全局快照以保持元组可见性。 请注意,每个事务在开始和结束时以及在两阶段提交协议中发出PREPARE transaction命令时都会报告。还请注意,GTM提供的全局快照包括在其他组件上运行的其他事务。

每个事务根据PostgreSQL中的事务隔离级别请求快照。如果事务隔离级别为“read committed”,则事务将为每个语句请求一个快照。如果事务隔离级别为“repeatable read”,则事务将在事务开始时请求一个快照,并在整个事务中重用它。 GTM还提供全局值,例如sequence。其他全局属性(如时间戳和通知)将是以下版本中的一个扩展。

关键组件间的交互

如前一节所述,postgresxc有三个主要组件来提供多节点读写的全局一致性,并确定每条语句应该转到哪个datanode和处理该语句。 图1.11给出了postgresxc组件之间的全局事务控制和交互顺序。 如图所示,当协调器开始一个新事务时,它向GTM请求新的事务ID(GXID,global transaction ID)。GTM跟踪这些需求以计算全局快照。


如果事务隔离模式是重复读取,则将获取快照并在整个事务中使用。当协调器coordinator接受来自应用程序的语句并且隔离模式为READ COMMITTED时,将从GTM获取快照。然后分析语句,确定要转到哪个datanode,并在必要时为每个datanode进行转换。 请注意,语句将通过GXID和global snapshot传递到适当的datanode,以维护全局事务标识和表的每行的可见性。每个结果都将被收集并计算到对应用程序的响应中。 在事务结束时,如果事务中的更新涉及多个datanode,协调器将为2PC发出PREPARE transaction,然后发出COMMIT。这些步骤将报告给GTM,并跟踪每个事务状态,以便计算后续全局快照。

GTM的角色

GTM 有四种角色:Master、Slave、Proxy和client。如下是这些角色的配置文件的示例。


GTM Master

图1.11中的顺序,可以如图1.12所示实现。协调器Coordinator后端对应于PostgreSQL的后端进程,它处理来自应用程序的数据库连接并处理每个事务。 结构和算法概述如下: 1. Coordinator后端提供GTM客户端库,获取GXID和快照并报告事务状态。 2. GTM打开一个端口以接受来自每个协调器Coordinator后端的连接。当GTM接受连接时,它会创建一个线程(GTM thread)来处理从连接的协调器后端到GTM的请求。 3. GTM线程接收每个请求,记录并将GXID、快照和其他响应返回给协调器Coordinator后端。 4. 重复上述顺序,直到协调器后端请求断开连接。

这里看一个重要流程(当GTM接受连接时,它会创建一个线程GTM thread来处理从连接的协调器后端到GTM的请求)的代码: postgres-xl-10r1.1 gtm/main/main.c中的main函数第838行的ServerLoop函数,其主要结构是一个for(;;)死循环,用于等待连接的到来,多等待一分钟,保证其他后台有时间处理。这里主要看该死循环中的所进行的步骤: 1. 允许所有信号 PG_SETMASK(&UnBlockSig) 2. 判断是否退出GTM程序,判断标识GTMAbortPending 3. GTM_RWLockRelease,释放锁 Now GTM-Standby can backup current status during this region 4. selres = select(nSockets, &rmask, NULL, NULL, &timeout) 使用select接收连接 5. GTM_RWLockAcquire,获取锁 Prohibit GTM-Standby backup from here. 6. 阻塞所有信号 PG_SETMASK(&BlockSig) 7. 检测select函数调用结果 8. 如果有新连接连接进来,则fork一个GTM thread子进程进行处理

static int ServerLoop(void) {
    fd_set      readmask;
    int         nSockets;
    nSockets = initMasks(&readmask);
    for (;;)
    {
        fd_set      rmask;
        int         selres;
        //MemoryContextStats(TopMostMemoryContext);
        /* Wait for a connection request to arrive. We wait at most one minute, to ensure that the other background
         * tasks handled below get done even when no requests are arriving. */
        memcpy((char *) &rmask, (char *) &readmask, sizeof(fd_set));
        PG_SETMASK(&UnBlockSig);
        if (GTMAbortPending) {
            /* XXX We should do a clean shutdown here. For the time being, just write the next GXID to be issued in the control file and exit gracefully */
            elog(LOG, "GTM shutting down.");
            /* Tell GTM that we are shutting down so that no new GXIDs are issued this point onwards */
            GTM_SetShuttingDown();
            SaveControlInfo();
            exit(1);
        }
        {
            /* must set timeout each time; some OSes change it! */
            struct timeval timeout;
            GTM_ThreadInfo *my_threadinfo = GetMyThreadInfo;
            timeout.tv_sec = 60;
            timeout.tv_usec = 0;
            /* Now GTM-Standby can backup current status during this region */
            GTM_RWLockRelease(&my_threadinfo->thr_lock);
            selres = select(nSockets, &rmask, NULL, NULL, &timeout);
            /* Prohibit GTM-Standby backup from here. */
            GTM_RWLockAcquire(&my_threadinfo->thr_lock, GTM_LOCKMODE_WRITE);
        }

        /* Block all signals until we wait again.  (This makes it safe for our signal handlers to do nontrivial work.) */
        PG_SETMASK(&BlockSig);
        /* Now check the select() result */
        if (selres < 0){
            if (errno != EINTR && errno != EWOULDBLOCK){
                ereport(LOG,(EACCES,errmsg("select() failed in main thread: %m")));
                return STATUS_ERROR;
            }
        }
        /* New connection pending on any of our sockets? If so, fork a child process to deal with it. */
        if (selres > 0){
            int         i;
            for (i = 0; i < MAXLISTEN; i++){
                if (ListenSocket[i] == -1)
                    break;
                if (FD_ISSET(ListenSocket[i], &rmask)){
                    Port       *port;
                    port = ConnCreate(ListenSocket[i]);
                    if (port){
                        GTM_Conn *standby = NULL;
                        standby = gtm_standby_connect_to_standby();
                        if (GTMAddConnection(port, standby) != STATUS_OK){
                            gtm_standby_disconnect_from_standby(standby);
                            StreamClose(port->sock);
                            ConnFree(port);
                        }
                    }
                }
            }
        }
    }
}

这里主要看有新连接连接进来,则fork一个GTM thread子进程进行处理的程序代码。由于这里面有GTM slave处理的代码,先不看这里的逻辑,关注GTMAddConnection函数。

从下面代码中可以看出GTMAddConnection调用GTM_ThreadCreate函数创建新的进程处理连接。

static int GTMAddConnection(Port *port, GTM_Conn *standby) {
    GTM_ConnectionInfo *conninfo = NULL;
    conninfo = (GTM_ConnectionInfo *)palloc0(sizeof (GTM_ConnectionInfo));
    elog(DEBUG3, "Started new connection");
    conninfo->con_port = port;
    /* Add a connection to the standby. */
    if (standby != NULL)
        conninfo->standby = standby;
    /* XXX Start the thread */
    if (GTM_ThreadCreate(conninfo, GTM_ThreadMain) == NULL)
        return STATUS_ERROR;
    return STATUS_OK;
}

上述每种交互都是单独完成的。例如,如果协调器的数量是10个,每个协调器有100个来自应用程序的连接,这在事务应用程序中的单个PostgreSQL中是相当合理的,那么GTM必须有1000个GTM线程。如果每个后端在一秒钟内发出25个事务,并且每个事务包含5个语句,并且每个协调器运行100个后端,那么GTM和10个协调器之间提供全局快照的交互总数可以估计为:10× 100× 25× 5 = 125000; 因为我们在每个协调器中有100个后端,所以快照的长度(GXID是32位整数,在PostgreSQL中定义)将是4× 100× 10=4000字节。因此,GTM必须在一秒钟内发送大约600兆字节的数据才能支持这种规模。它是相当大的千兆网络可以支持14。实际上,从GTM发送的数据量的顺序是O($N^2$),其中N是协调器的数量。 问题不仅在于数据量。互动的次数是个问题。非常简单的测试将显示,千兆网络提供多达100;每台服务器有1000次交互。 后面的网络负载测量显示数据量并不是很大,但是很明显我们需要一些方法来减少交互和数据量。

GTM Slave


摘录上节涉及的代码,新建连接涉及standby逻辑的处理部分来进行分析。

typedef struct Port {
    int         sock;               /* File descriptor */
    SockAddr    laddr;              /* local addr (postmaster) */
    SockAddr    raddr;              /* remote addr (client) */
    char        *remote_host;       /* name (or ip addr) of remote host */
    char        *remote_port;       /* text rep of remote port */
    GTM_PortLastCall last_call;     /* Last syscall to this port */
    int         last_errno;         /* Last errno. zero if the last call succeeds */
    GTMProxy_ConnID conn_id;        /* RequestID of this command */
    GTM_PGXCNodeType    remote_type;    /* Type of remote connection */
    char        *node_name;
    bool        is_postmaster;      /* Is remote a node postmaster? */
#define PQ_BUFFER_SIZE 8192
    char        PqSendBuffer[PQ_BUFFER_SIZE];
    int         PqSendPointer;      /* Next index to store a byte in PqSendBuffer */
    char        PqRecvBuffer[PQ_BUFFER_SIZE];
    int         PqRecvPointer;      /* Next index to read a byte from PqRecvBuffer */
    int         PqRecvLength;       /* End of data available in PqRecvBuffer */
    /* TCP keepalive settings.
     * default values are 0 if AF_UNIX or not yet known; current values are 0
     * if AF_UNIX or using the default. Also, -1 in a default value means we
     * were unable to find out the default (getsockopt failed). */
    int         default_keepalives_idle;
    int         default_keepalives_interval;
    int         default_keepalives_count;
    int         keepalives_idle;
    int         keepalives_interval;
    int         keepalives_count;
    /* GTM communication error handling.  See libpq-int.h for details. */
    int         connErr_WaitOpt;
    int         connErr_WaitInterval;
    int         connErr_WaitCount;
} Port;

struct gtm_conn {
    /* Saved values of connection options */
    char        *pghost;            /* the machine on which the server is running */
    char        *pghostaddr;        /* the IPv4 address of the machine on which the server is running, in IPv4 numbers-and-dots notation. Takes precedence over above. */
    char        *pgport;            /* the server's communication port */
    char        *connect_timeout;   /* connection timeout (numeric string) */
    int          comm_timeout;      /* communication timeout, 0 means infinite */
    char        *gc_node_name;      /* PGXC Node Name */
    int         remote_type;        /* is this a connection to/from a proxy ? */
    int         is_postmaster;      /* is this connection to/from a postmaster instance */
    uint32      my_id;              /* unique identifier issued to us by GTM */
    /* Optional file to write trace info to */
    FILE        *Pfdebug;
    /* Status indicators */
    ConnStatusType  status;
    /* Connection data */
    int         sock;           /* Unix FD for socket, -1 if not connected */
    SockAddr    laddr;          /* Local address */
    SockAddr    raddr;          /* Remote address */
    /* Error info for GTM communication */
    GTM_PortLastCall    last_call;  /* Last syscall to this sock. */
    int                 last_errno; /* Last errno.  zero if the last call succeeds. */
    /* Transient state needed while establishing connection */
    struct addrinfo *addrlist;  /* list of possible backend addresses */
    struct addrinfo *addr_cur;  /* the one currently being tried */
    int     addrlist_family;    /* needed to know how to free addrlist */
    /* Buffer for data received from backend and not yet processed */
    char    *inBuffer;      /* currently allocated buffer */
    int     inBufSize;      /* allocated size of buffer */
    int     inStart;        /* offset to first unconsumed data in buffer */
    int     inCursor;       /* next byte to tentatively consume */
    int     inEnd;          /* offset to first position after avail data */
    /* Buffer for data not yet sent to backend */
    char    *outBuffer;     /* currently allocated buffer */
    int     outBufSize;     /* allocated size of buffer */
    int     outCount;       /* number of chars waiting in buffer */
    /* State for constructing messages in outBuffer */
    int     outMsgStart;    /* offset to msg start (length word); if -1, msg has no length word */
    int     outMsgEnd;      /* offset to msg end (so far) */
    /* Buffer for current error message */
    PQExpBufferData errorMessage;       /* expansible string */
    /* Buffer for receiving various parts of messages */
    PQExpBufferData workBuffer; /* expansible string */
    /* Pointer to the result of last operation */
    GTM_Result  *result;
};
typedef struct gtm_conn GTM_Conn;

// ServerLoop函数涉及新建连接
Port       *port;
port = ConnCreate(ListenSocket[i]);
if (port){
    GTM_Conn *standby = NULL;
    standby = gtm_standby_connect_to_standby();
    if (GTMAddConnection(port, standby) != STATUS_OK){
        gtm_standby_disconnect_from_standby(standby);
        StreamClose(port->sock);
        ConnFree(port);

GTM Proxy

您可能已经注意到,每个事务都如此频繁地向GTM发出请求,我们可以将它们收集到每个协调器中的单个请求块中,以减少交互量。这就是GTM代理实现的思想,如图1.13所示。


在此配置中,每个协调器后端都不直接连接到GTM。相反,我们在GTM和协调器后端之间使用GTM代理来对多个请求和响应进行分组。GTM代理,就像1.5.1节中解释的GTM一样,接受来自协调器后端的连接。但是,它不会创建新线程。下面的段落解释如何初始化GTM代理以及它如何处理来自协调器后端的请求。 GTM代理和GTM的初始化如下: 1.GTM启动如第1.5.1节所述。现在GTM可以接受来自GTM代理的连接。 2.GTM代理启动。GTM代理创建GTM代理线程。每个GTM代理线程提前连接到GTM。可以在启动时指定GTM代理线程的数量。典型的线程数是一个或两个,这样可以节省GTM和协调器之间的连接数。 3.GTM主线程等待来自每个后端的请求连接。

当每个协调器后端请求连接时,代理主线程分配一个GTM代理线程来处理请求。因此,一个GTM代理线程负责多个协调器后端。如果一个协调器有一百个协调器后端和一个GTM代理线程,那么这个线程负责一百个协调器后端。 然后GTM代理线程扫描来自协调器后端的所有请求。如果协调器更忙,那么它将在一次扫描中捕获更多的请求。因此,代理可以将多个请求分组为单个请求块,以减少GTM与协调器之间的交互次数。 此外,在一次扫描中,可能会有多个快照请求。因为这些请求可以被视为同时接收,所以我们可以用一个快照来表示多个快照。这将减少GTM提供的数据量。 测试结果将在后面给出,但是可以观察到GTM代理至少适用于20个协调器,例如DBT-1。在GTM代理结构中,对交互顺序和数据量的估计并不简单。当Postgres XC的工作量很小时,交互将与第1.5.1节中的情况相同。另一方面,当工作负载较重时,预期数据量将小于O($N^2$),并且交互次数将小于O(N)。

GTM代码概览

Postgres-xc中GTM进程和postmaster进程是相互独立,拥有独立的二进制文件,配置文件,log文件和pid文件,并且和postmaster是分开启动的。src/include/gtm是GTM函数的头文件,src/gtm/client是GTM client的库函数,src/gtm/config是配置文件的扫描函数,src/gtm/main是GTM主程序,src/gtm/proxy是GTM Proxy主程序,src/gtm/recovery是PGXC节点在GTM和Proxy上的注册函数以及GTM standby全局变量的工具函数,src/gtm/common是GTM、GTM Proxy、GTM client的通用函数,src/gtm/libpq是libpq协议函数,src/gtm/path是path处理函数。

Postgres-xl的GTM源码结构如下所示


http://postgres-x2.github.io/developer.html

来源 https://zhuanlan.zhihu.com/p/473846708
分享好友

分享这个小栈给你的朋友们,一起进步吧。

Postgres-XL
创建时间:2022-02-18 17:49:48
Postgres-XL
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • itt0918
    专家
戳我,来吐槽~