在 PostgreSQL 中,每个事务都被赋予的 ID,称为事务 ID(或 XID)。 XID 按升序给出,以确定哪个事务较旧/较新。 当事务尝试读取元组时,每个元组都有一组 XID 来指示创建和删除该元组的事务。 因此,如果目标元组是由活动事务创建的,则不会提交或中止,读取事务应忽略此类元组。 以这种方式(在实践中,这是由 PostgreSQL 核心中的 tqual.c 模块完成的),如果我们在整个系统中(不仅在单个服务器中而且在所有服务器中 )给每个事务一个的事务 Id 并维护快照(哪个事务是活动的),即使服务器接受来自仅在其他服务器上运行的其他事务的新语句,我们也可以保持每个元组的全局一致可见性。
这些信息存储在表的每一行的“xmin”和“xmax”字段中。 当我们插入行时,插入事务的 XID 记录在 xmin 字段中。 当我们更新表的行(使用 UPDATE 或 DELETE 语句)时,PostgreSQL 不会简单地覆盖旧的行。 相反,PostgreSQL 通过将更新事务的 XID 写入 xmax 字段来“标记”旧行为“已删除”。 在 UPDATE(就像 INSERT)的情况下,会创建新行,其 xmin 字段被“标记”为创建事务的 XID。
这些“xmin”和“xmax”用于确定哪个行对事务可见。 为此,PostgreSQL 需要一个数据来指示特定时间正在运行哪些事务。 这称为“快照”。 如果一个事务在快照中,即使它已经完成也被认为是在运行。 你应该明白,这个特定的时间不仅仅是现在。 如果事务的隔离级别是读提交read committed,则事务需要在一段时间内保持一致的可见性,至少在执行 SQL 语句时是这样。 如果 SQL 语句读取在此执行期间提交的单行,则不是可取的。 因此,在读提交隔离级别的情况下,数据库应该在语句执行前获取快照并在整个执行过程中继续使用它。 在可重复读取和可序列化的情况下,事务需要在整个事务执行过程中保持一致的可见性。在这种情况下,事务应该在语句执行之前获取快照,并且应该在整个事务执行过程中继续使用快照,而不是单个语句执行。如果创建行的事务没有运行,则每行的可见性取决于创建事务是提交还是中止。假设一个表的行是由某个事务创建的,但尚未删除。如果创建事务正在运行,则该行对创建该行的事务可见,但对其他事务不可见。如果创建事务未运行且已提交,则行可见。如果事务被中止,则此行不可见。
因此,PostgreSQL 需要两种信息来确定“哪个事务正在运行”和“是否提交或中止了旧事务”。 前者的信息可以作为“快照”获得。 PostgreSQL 将后面的信息维护为“CLOG”。PostgreSQL 使用所有这些信息来确定哪一行对给定事务可见 - 哪个事务正在运行 --> 快照 - 是否提交或中止了旧事务 --> CLOG 事务的元数据。这种日志用于告诉PostgreSQL哪个事务已经完成、哪个还没有完成
事务管理全局化
在 Postgres-XC 中,GTM 为事务管理提供了以下功能: 1. 给事务分配全局XID(全局事务ID GXID,Global Transaction ID)。 使用 GXID,可以全局识别事务。 如果一项事务写入多个节点,我们可以跟踪此类写入。 2. 提供快照。 GTM 收集所有事务的状态(running、commited、aborted 等)以提供全局快照(全局快照)。请注意,全局快照包括给其他服务器的 GXID,如图 1.8 所示。因为一些较旧的事务可能会访问新的服务器,在这种情况下,如果快照中不包含此类事务的 GXID,则该事务可能会被视为“足够老”,并且可能会读取未提交的行。如果此类事务的 GXID 一开始就包含在快照中,这种不一致不会发生。
我们需要全局快照的原因如下: 2PC协议强制每个分布式事务的更新。但是,它并不强制保持分布式事务对其他事务更新的一致可见性。根据每个节点的提交时间,更新可能对读取这些节点的同一事务可见,也可能不可见。为了保持一致的可见性,我们需要一个全局快照,其中包含postgresxc集群中所有正在运行的事务信息(在本例中为GXID、全局事务id),并在PostgreSQL中找到的读取操作的相同上下文中使用它。
为此,postgresxc引入了一个名为GTM(global transaction manager)的专用组件。GTM作为一个单独的组件运行,并为postgresxc服务器上运行的每个事务提供且有序的事务id。我们称之为GXID(全局事务Id),因为这是全局的Id, GTM从事务接收GXID请求并提供GXID。它还跟踪所有事务的开始和结束时间,以生成用于控制每个元组可见性的快照。因为这里的快照也是全局属性,所以称为全局快照。只要每个事务都使用GXID和Global Snapshot运行,它就可以在整个系统中保持一致的可见性,并且在任何服务器上并行运行事务都是安全的。
另一方面,一个由多个语句组成的事务可以使用多个服务器来执行,保持一致的更新和可见性。这一机制的概要如图1.9所示。请注意每个快照中包含的事务是如何根据全局事务进行更改的。
GTM为每个事务提供全局事务Id,并跟踪所有事务的状态,无论事务是正在运行、已提交还是已中止,以计算全局快照以保持元组可见性。 请注意,每个事务在开始和结束时以及在两阶段提交协议中发出PREPARE transaction命令时都会报告。还请注意,GTM提供的全局快照包括在其他组件上运行的其他事务。
每个事务根据PostgreSQL中的事务隔离级别请求快照。如果事务隔离级别为“read committed”,则事务将为每个语句请求一个快照。如果事务隔离级别为“repeatable read”,则事务将在事务开始时请求一个快照,并在整个事务中重用它。 GTM还提供全局值,例如sequence。其他全局属性(如时间戳和通知)将是以下版本中的一个扩展。
关键组件间的交互
如前一节所述,postgresxc有三个主要组件来提供多节点读写的全局一致性,并确定每条语句应该转到哪个datanode和处理该语句。 图1.11给出了postgresxc组件之间的全局事务控制和交互顺序。 如图所示,当协调器开始一个新事务时,它向GTM请求新的事务ID(GXID,global transaction ID)。GTM跟踪这些需求以计算全局快照。
如果事务隔离模式是重复读取,则将获取快照并在整个事务中使用。当协调器coordinator接受来自应用程序的语句并且隔离模式为READ COMMITTED时,将从GTM获取快照。然后分析语句,确定要转到哪个datanode,并在必要时为每个datanode进行转换。 请注意,语句将通过GXID和global snapshot传递到适当的datanode,以维护全局事务标识和表的每行的可见性。每个结果都将被收集并计算到对应用程序的响应中。 在事务结束时,如果事务中的更新涉及多个datanode,协调器将为2PC发出PREPARE transaction,然后发出COMMIT。这些步骤将报告给GTM,并跟踪每个事务状态,以便计算后续全局快照。
GTM的角色
GTM 有四种角色:Master、Slave、Proxy和client。如下是这些角色的配置文件的示例。
GTM Master
图1.11中的顺序,可以如图1.12所示实现。协调器Coordinator后端对应于PostgreSQL的后端进程,它处理来自应用程序的数据库连接并处理每个事务。 结构和算法概述如下: 1. Coordinator后端提供GTM客户端库,获取GXID和快照并报告事务状态。 2. GTM打开一个端口以接受来自每个协调器Coordinator后端的连接。当GTM接受连接时,它会创建一个线程(GTM thread)来处理从连接的协调器后端到GTM的请求。 3. GTM线程接收每个请求,记录并将GXID、快照和其他响应返回给协调器Coordinator后端。 4. 重复上述顺序,直到协调器后端请求断开连接。
这里看一个重要流程(当GTM接受连接时,它会创建一个线程GTM thread来处理从连接的协调器后端到GTM的请求)的代码: postgres-xl-10r1.1 gtm/main/main.c中的main函数第838行的ServerLoop函数,其主要结构是一个for(;;)死循环,用于等待连接的到来,多等待一分钟,保证其他后台有时间处理。这里主要看该死循环中的所进行的步骤: 1. 允许所有信号 PG_SETMASK(&UnBlockSig) 2. 判断是否退出GTM程序,判断标识GTMAbortPending 3. GTM_RWLockRelease,释放锁 Now GTM-Standby can backup current status during this region 4. selres = select(nSockets, &rmask, NULL, NULL, &timeout) 使用select接收连接 5. GTM_RWLockAcquire,获取锁 Prohibit GTM-Standby backup from here. 6. 阻塞所有信号 PG_SETMASK(&BlockSig) 7. 检测select函数调用结果 8. 如果有新连接连接进来,则fork一个GTM thread子进程进行处理
static int ServerLoop(void) {
fd_set readmask;
int nSockets;
nSockets = initMasks(&readmask);
for (;;)
{
fd_set rmask;
int selres;
//MemoryContextStats(TopMostMemoryContext);
/* Wait for a connection request to arrive. We wait at most one minute, to ensure that the other background
* tasks handled below get done even when no requests are arriving. */
memcpy((char *) &rmask, (char *) &readmask, sizeof(fd_set));
PG_SETMASK(&UnBlockSig);
if (GTMAbortPending) {
/* XXX We should do a clean shutdown here. For the time being, just write the next GXID to be issued in the control file and exit gracefully */
elog(LOG, "GTM shutting down.");
/* Tell GTM that we are shutting down so that no new GXIDs are issued this point onwards */
GTM_SetShuttingDown();
SaveControlInfo();
exit(1);
}
{
/* must set timeout each time; some OSes change it! */
struct timeval timeout;
GTM_ThreadInfo *my_threadinfo = GetMyThreadInfo;
timeout.tv_sec = 60;
timeout.tv_usec = 0;
/* Now GTM-Standby can backup current status during this region */
GTM_RWLockRelease(&my_threadinfo->thr_lock);
selres = select(nSockets, &rmask, NULL, NULL, &timeout);
/* Prohibit GTM-Standby backup from here. */
GTM_RWLockAcquire(&my_threadinfo->thr_lock, GTM_LOCKMODE_WRITE);
}
/* Block all signals until we wait again. (This makes it safe for our signal handlers to do nontrivial work.) */
PG_SETMASK(&BlockSig);
/* Now check the select() result */
if (selres < 0){
if (errno != EINTR && errno != EWOULDBLOCK){
ereport(LOG,(EACCES,errmsg("select() failed in main thread: %m")));
return STATUS_ERROR;
}
}
/* New connection pending on any of our sockets? If so, fork a child process to deal with it. */
if (selres > 0){
int i;
for (i = 0; i < MAXLISTEN; i++){
if (ListenSocket[i] == -1)
break;
if (FD_ISSET(ListenSocket[i], &rmask)){
Port *port;
port = ConnCreate(ListenSocket[i]);
if (port){
GTM_Conn *standby = NULL;
standby = gtm_standby_connect_to_standby();
if (GTMAddConnection(port, standby) != STATUS_OK){
gtm_standby_disconnect_from_standby(standby);
StreamClose(port->sock);
ConnFree(port);
}
}
}
}
}
}
}
这里主要看有新连接连接进来,则fork一个GTM thread子进程进行处理的程序代码。由于这里面有GTM slave处理的代码,先不看这里的逻辑,关注GTMAddConnection函数。
从下面代码中可以看出GTMAddConnection调用GTM_ThreadCreate函数创建新的进程处理连接。
static int GTMAddConnection(Port *port, GTM_Conn *standby) {
GTM_ConnectionInfo *conninfo = NULL;
conninfo = (GTM_ConnectionInfo *)palloc0(sizeof (GTM_ConnectionInfo));
elog(DEBUG3, "Started new connection");
conninfo->con_port = port;
/* Add a connection to the standby. */
if (standby != NULL)
conninfo->standby = standby;
/* XXX Start the thread */
if (GTM_ThreadCreate(conninfo, GTM_ThreadMain) == NULL)
return STATUS_ERROR;
return STATUS_OK;
}
上述每种交互都是单独完成的。例如,如果协调器的数量是10个,每个协调器有100个来自应用程序的连接,这在事务应用程序中的单个PostgreSQL中是相当合理的,那么GTM必须有1000个GTM线程。如果每个后端在一秒钟内发出25个事务,并且每个事务包含5个语句,并且每个协调器运行100个后端,那么GTM和10个协调器之间提供全局快照的交互总数可以估计为:10× 100× 25× 5 = 125000; 因为我们在每个协调器中有100个后端,所以快照的长度(GXID是32位整数,在PostgreSQL中定义)将是4× 100× 10=4000字节。因此,GTM必须在一秒钟内发送大约600兆字节的数据才能支持这种规模。它是相当大的千兆网络可以支持14。实际上,从GTM发送的数据量的顺序是O($N^2$),其中N是协调器的数量。 问题不仅在于数据量。互动的次数是个问题。非常简单的测试将显示,千兆网络提供多达100;每台服务器有1000次交互。 后面的网络负载测量显示数据量并不是很大,但是很明显我们需要一些方法来减少交互和数据量。
GTM Slave
摘录上节涉及的代码,新建连接涉及standby逻辑的处理部分来进行分析。
typedef struct Port {
int sock; /* File descriptor */
SockAddr laddr; /* local addr (postmaster) */
SockAddr raddr; /* remote addr (client) */
char *remote_host; /* name (or ip addr) of remote host */
char *remote_port; /* text rep of remote port */
GTM_PortLastCall last_call; /* Last syscall to this port */
int last_errno; /* Last errno. zero if the last call succeeds */
GTMProxy_ConnID conn_id; /* RequestID of this command */
GTM_PGXCNodeType remote_type; /* Type of remote connection */
char *node_name;
bool is_postmaster; /* Is remote a node postmaster? */
#define PQ_BUFFER_SIZE 8192
char PqSendBuffer[PQ_BUFFER_SIZE];
int PqSendPointer; /* Next index to store a byte in PqSendBuffer */
char PqRecvBuffer[PQ_BUFFER_SIZE];
int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
int PqRecvLength; /* End of data available in PqRecvBuffer */
/* TCP keepalive settings.
* default values are 0 if AF_UNIX or not yet known; current values are 0
* if AF_UNIX or using the default. Also, -1 in a default value means we
* were unable to find out the default (getsockopt failed). */
int default_keepalives_idle;
int default_keepalives_interval;
int default_keepalives_count;
int keepalives_idle;
int keepalives_interval;
int keepalives_count;
/* GTM communication error handling. See libpq-int.h for details. */
int connErr_WaitOpt;
int connErr_WaitInterval;
int connErr_WaitCount;
} Port;
struct gtm_conn {
/* Saved values of connection options */
char *pghost; /* the machine on which the server is running */
char *pghostaddr; /* the IPv4 address of the machine on which the server is running, in IPv4 numbers-and-dots notation. Takes precedence over above. */
char *pgport; /* the server's communication port */
char *connect_timeout; /* connection timeout (numeric string) */
int comm_timeout; /* communication timeout, 0 means infinite */
char *gc_node_name; /* PGXC Node Name */
int remote_type; /* is this a connection to/from a proxy ? */
int is_postmaster; /* is this connection to/from a postmaster instance */
uint32 my_id; /* unique identifier issued to us by GTM */
/* Optional file to write trace info to */
FILE *Pfdebug;
/* Status indicators */
ConnStatusType status;
/* Connection data */
int sock; /* Unix FD for socket, -1 if not connected */
SockAddr laddr; /* Local address */
SockAddr raddr; /* Remote address */
/* Error info for GTM communication */
GTM_PortLastCall last_call; /* Last syscall to this sock. */
int last_errno; /* Last errno. zero if the last call succeeds. */
/* Transient state needed while establishing connection */
struct addrinfo *addrlist; /* list of possible backend addresses */
struct addrinfo *addr_cur; /* the one currently being tried */
int addrlist_family; /* needed to know how to free addrlist */
/* Buffer for data received from backend and not yet processed */
char *inBuffer; /* currently allocated buffer */
int inBufSize; /* allocated size of buffer */
int inStart; /* offset to first unconsumed data in buffer */
int inCursor; /* next byte to tentatively consume */
int inEnd; /* offset to first position after avail data */
/* Buffer for data not yet sent to backend */
char *outBuffer; /* currently allocated buffer */
int outBufSize; /* allocated size of buffer */
int outCount; /* number of chars waiting in buffer */
/* State for constructing messages in outBuffer */
int outMsgStart; /* offset to msg start (length word); if -1, msg has no length word */
int outMsgEnd; /* offset to msg end (so far) */
/* Buffer for current error message */
PQExpBufferData errorMessage; /* expansible string */
/* Buffer for receiving various parts of messages */
PQExpBufferData workBuffer; /* expansible string */
/* Pointer to the result of last operation */
GTM_Result *result;
};
typedef struct gtm_conn GTM_Conn;
// ServerLoop函数涉及新建连接
Port *port;
port = ConnCreate(ListenSocket[i]);
if (port){
GTM_Conn *standby = NULL;
standby = gtm_standby_connect_to_standby();
if (GTMAddConnection(port, standby) != STATUS_OK){
gtm_standby_disconnect_from_standby(standby);
StreamClose(port->sock);
ConnFree(port);
GTM Proxy
您可能已经注意到,每个事务都如此频繁地向GTM发出请求,我们可以将它们收集到每个协调器中的单个请求块中,以减少交互量。这就是GTM代理实现的思想,如图1.13所示。
在此配置中,每个协调器后端都不直接连接到GTM。相反,我们在GTM和协调器后端之间使用GTM代理来对多个请求和响应进行分组。GTM代理,就像1.5.1节中解释的GTM一样,接受来自协调器后端的连接。但是,它不会创建新线程。下面的段落解释如何初始化GTM代理以及它如何处理来自协调器后端的请求。 GTM代理和GTM的初始化如下: 1.GTM启动如第1.5.1节所述。现在GTM可以接受来自GTM代理的连接。 2.GTM代理启动。GTM代理创建GTM代理线程。每个GTM代理线程提前连接到GTM。可以在启动时指定GTM代理线程的数量。典型的线程数是一个或两个,这样可以节省GTM和协调器之间的连接数。 3.GTM主线程等待来自每个后端的请求连接。
当每个协调器后端请求连接时,代理主线程分配一个GTM代理线程来处理请求。因此,一个GTM代理线程负责多个协调器后端。如果一个协调器有一百个协调器后端和一个GTM代理线程,那么这个线程负责一百个协调器后端。 然后GTM代理线程扫描来自协调器后端的所有请求。如果协调器更忙,那么它将在一次扫描中捕获更多的请求。因此,代理可以将多个请求分组为单个请求块,以减少GTM与协调器之间的交互次数。 此外,在一次扫描中,可能会有多个快照请求。因为这些请求可以被视为同时接收,所以我们可以用一个快照来表示多个快照。这将减少GTM提供的数据量。 测试结果将在后面给出,但是可以观察到GTM代理至少适用于20个协调器,例如DBT-1。在GTM代理结构中,对交互顺序和数据量的估计并不简单。当Postgres XC的工作量很小时,交互将与第1.5.1节中的情况相同。另一方面,当工作负载较重时,预期数据量将小于O($N^2$),并且交互次数将小于O(N)。
GTM代码概览
Postgres-xc中GTM进程和postmaster进程是相互独立,拥有独立的二进制文件,配置文件,log文件和pid文件,并且和postmaster是分开启动的。src/include/gtm是GTM函数的头文件,src/gtm/client是GTM client的库函数,src/gtm/config是配置文件的扫描函数,src/gtm/main是GTM主程序,src/gtm/proxy是GTM Proxy主程序,src/gtm/recovery是PGXC节点在GTM和Proxy上的注册函数以及GTM standby全局变量的工具函数,src/gtm/common是GTM、GTM Proxy、GTM client的通用函数,src/gtm/libpq是libpq协议函数,src/gtm/path是path处理函数。
Postgres-xl的GTM源码结构如下所示