在block操作底层的文件中,句话就是
/* blocked.c - generic support for blocking operations like BLPOP & WAIT.
*/
这边是提供了一些api,供外界调用
1. getTimeoutFromObjectOrReply(),在object当中获取timeout参数,具体如下
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
long long tval;
if (getLongLongFromObjectOrReply(c,object,&tval,
"timeout is not an integer or out of range") != C_OK)
return C_ERR;
if (tval < 0) {
addReplyError(c,"timeout is negative");
return C_ERR;
}
if (tval > 0) {
if (unit == UNIT_SECONDS) tval *= 1000;
tval += mstime();
}
*timeout = tval;
return C_OK;
}
2. blockClient(), 将一个client的CLIENT_BLOCKED位置位,并设置具体阻塞的类型
void blockClient(client *c, int btype) {
c->flags |= CLIENT_BLOCKED;
c->btype = btype;
server.bpop_blocked_clients++;
}
3. processUnblockedClients(),当一个客户端被解出阻塞之后,在event loop当中的beforeSleep()函数中会被调用,用于处理这些client的input buffer
void processUnblockedClients(void) {
listNode *ln;
client *c;
while (listLength(server.unblocked_clients)) {
ln = listFirst(server.unblocked_clients);
serverAssert(ln != NULL);
c = ln->value;
listDelNode(server.unblocked_clients,ln);
c->flags &= ~CLIENT_UNBLOCKED;
/* Process remaining data in the input buffer, unless the client
* is blocked again. Actually processInputBuffer() checks that the
* client is not blocked before to proceed, but things may change and
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
if (c->querybuf && sdslen(c->querybuf) > 0) {
processInputBuffer(c);
}
}
}
}
4. unblockClient(), 根据当前client被阻塞的类型进行unblock,
void unblockClient(client *c) {
if (c->btype == BLOCKED_LIST) {
unblockClientWaitingData(c);
} else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
} else if (c->btype == BLOCKED_MODULE) {
unblockClientFromModule(c);
} else {
serverPanic("Unknown btype in unblockClient().");
}
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
c->flags &= ~CLIENT_BLOCKED;
c->btype = BLOCKED_NONE;
server.bpop_blocked_clients--;
/* The client may already be into the unblocked list because of a previous
* blocking operation, don't add back it into the list multiple times. */
if (!(c->flags & CLIENT_UNBLOCKED)) {
c->flags |= CLIENT_UNBLOCKED;
listAddNodeTail(server.unblocked_clients,c);
}
}
5. replyToBlockedClientTimedOut(),当被阻塞的client到达time out时间后,给其发送响应
void replyToBlockedClientTimedOut(client *c) {
if (c->btype == BLOCKED_LIST) {
addReply(c,shared.nullmultibulk);
} else if (c->btype == BLOCKED_WAIT) {
addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
} else if (c->btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
}
6. disconnectAllBlockedClients(), 这个函数只有在一个节点由主节点变为从节点,的时候,会给客户端发送一个unblocked error,并且断开与客户端的连接
void disconnectAllBlockedClients(void) {
listNode *ln;
listIter li;
listRewind(server.clients,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_BLOCKED) {
addReplySds(c,sdsnew(
"-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> slave?)\r\n"));
unblockClient(c);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
}