绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
redis源码阅读之block操作底层实现
2019-09-03 15:01:47

在block操作底层的文件中,句话就是

/* blocked.c - generic support for blocking operations like BLPOP & WAIT.
 */

这边是提供了一些api,供外界调用

1. getTimeoutFromObjectOrReply(),在object当中获取timeout参数,具体如下

int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
    long long tval;

    if (getLongLongFromObjectOrReply(c,object,&tval,
        "timeout is not an integer or out of range") != C_OK)
        return C_ERR;

    if (tval < 0) {
        addReplyError(c,"timeout is negative");
        return C_ERR;
    }

    if (tval > 0) {
        if (unit == UNIT_SECONDS) tval *= 1000;
        tval += mstime();
    }
    *timeout = tval;

    return C_OK;
}

2. blockClient(), 将一个client的CLIENT_BLOCKED位置位,并设置具体阻塞的类型

void blockClient(client *c, int btype) {
    c->flags |= CLIENT_BLOCKED;
    c->btype = btype;
    server.bpop_blocked_clients++;
}

3. processUnblockedClients(),当一个客户端被解出阻塞之后,在event loop当中的beforeSleep()函数中会被调用,用于处理这些client的input buffer 

void processUnblockedClients(void) {
    listNode *ln;
    client *c;

    while (listLength(server.unblocked_clients)) {
        ln = listFirst(server.unblocked_clients);
        serverAssert(ln != NULL);
        c = ln->value;
        listDelNode(server.unblocked_clients,ln);
        c->flags &= ~CLIENT_UNBLOCKED;

        /* Process remaining data in the input buffer, unless the client
         * is blocked again. Actually processInputBuffer() checks that the
         * client is not blocked before to proceed, but things may change and
         * the code is conceptually more correct this way. */
        if (!(c->flags & CLIENT_BLOCKED)) {
            if (c->querybuf && sdslen(c->querybuf) > 0) {
                processInputBuffer(c);
            }
        }
    }
}

4. unblockClient(), 根据当前client被阻塞的类型进行unblock,

void unblockClient(client *c) {
    if (c->btype == BLOCKED_LIST) {
        unblockClientWaitingData(c);
    } else if (c->btype == BLOCKED_WAIT) {
        unblockClientWaitingReplicas(c);
    } else if (c->btype == BLOCKED_MODULE) {
        unblockClientFromModule(c);
    } else {
        serverPanic("Unknown btype in unblockClient().");
    }
    /* Clear the flags, and put the client in the unblocked list so that
     * we'll process new commands in its query buffer ASAP. */
    c->flags &= ~CLIENT_BLOCKED;
    c->btype = BLOCKED_NONE;
    server.bpop_blocked_clients--;
    /* The client may already be into the unblocked list because of a previous
     * blocking operation, don't add back it into the list multiple times. */
    if (!(c->flags & CLIENT_UNBLOCKED)) {
        c->flags |= CLIENT_UNBLOCKED;
        listAddNodeTail(server.unblocked_clients,c);
    }
}

5. replyToBlockedClientTimedOut(),当被阻塞的client到达time out时间后,给其发送响应

void replyToBlockedClientTimedOut(client *c) {
    if (c->btype == BLOCKED_LIST) {
        addReply(c,shared.nullmultibulk);
    } else if (c->btype == BLOCKED_WAIT) {
        addReplyLongLong(c,replicationCountAcksByOffset(c->bpop.reploffset));
    } else if (c->btype == BLOCKED_MODULE) {
        moduleBlockedClientTimedOut(c);
    } else {
        serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
    }
}

6. disconnectAllBlockedClients(), 这个函数只有在一个节点由主节点变为从节点,的时候,会给客户端发送一个unblocked error,并且断开与客户端的连接

void disconnectAllBlockedClients(void) {
    listNode *ln;
    listIter li;

    listRewind(server.clients,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        if (c->flags & CLIENT_BLOCKED) {
            addReplySds(c,sdsnew(
                "-UNBLOCKED force unblock from blocking operation, "
                "instance state changed (master -> slave?)\r\n"));
            unblockClient(c);
            c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        }
    }
}


分享好友

分享这个小栈给你的朋友们,一起进步吧。

龍门客栈
创建时间:2019-01-12 10:22:35
来新手村升级打怪啊!
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

栈主、嘉宾

查看更多
  • 栈栈
    栈主
  • gaokeke123
    嘉宾
  • ?
    嘉宾
  • 飘絮絮絮丶
    嘉宾

小栈成员

查看更多
  • 一号管理员
  • phyllis666
  • cynthia
  • 老七
戳我,来吐槽~