如果想获取一个stream所有的reader,那么必须调用这个函数:
Bitmapset *targets = GetLocalStreamReaders(relid);
如果stream下面没有reader,那么这个targets返回NULL。
我们跟到GetLocalStreamReaders里面看看
1 2 3 4 5 6 7 8 9 | Bitmapset * GetLocalStreamReaders(Oid relid) { Bitmapset *readers = GetAllStreamReaders(relid); if (stream_targets && readers) {<br> .....<br>} return readers; } |
中间的if不看,这个readers是通过调用GetAllStreamReaders来获取的,我们继续跟进去看看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | Bitmapset * GetAllStreamReaders(Oid relid) { HeapTuple tup = SearchSysCache1(PIPELINESTREAMRELID, ObjectIdGetDatum(relid)); bool isnull; ..... raw = SysCacheGetAttr(PIPELINESTREAMRELID, tup, Anum_pipeline_stream_queries, &isnull); if (isnull) return NULL; ...... ReleaseSysCache(tup); return result; } |
这段代码就很有意思了。
如果isnull直接return,而后面的ReleaseSysCaceh没有执行。
这样上面的tup就一直存在,没有释放掉。
这样会导致后面的一个断言错误。
来看看下面堆栈信息。
TRAP: FailedAssertion("!(ct->refcount == 0)", File: "catcache.c", Line: 588, PID: 3829, Query: (null))
assertion failure at:
pipeline: bgworker: worker [postgres] (ExceptionalCondition+0xaf)[0x906b0f]
pipeline: bgworker: worker [postgres] (AtEOXact_CatCache+0x1e6)[0x8eb735]
pipeline: bgworker: worker [postgres] [0x4fe75a]
pipeline: bgworker: worker [postgres] (CommitTransactionCommand+0x72)[0x4ff19c]
pipeline: bgworker: worker [postgres] (ContinuousQueryWorkerMain+0x6cd)[0x7366a1]
pipeline: bgworker: worker [postgres] [0x7343f9]
pipeline: bgworker: worker [postgres] (StartBackgroundWorker+0x2bd)[0x7427ea]
pipeline: bgworker: worker [postgres] [0x75532a]
pipeline: bgworker: worker [postgres] [0x755646]
pipeline: bgworker: worker [postgres] [0x750473]
pipeline: bgworker: worker [postgres] (PostmasterMain+0x110c)[0x74f92a]
pipeline: bgworker: worker [postgres] [0x694f85]
/lib64/libc.so.6(__libc_start_main+0xf5)[0x7fb8eb84caf5]
pipeline: bgworker: worker [postgres] [0x462e09]
我们看看catcache.c:588
1 2 3 4 5 6 7 8 9 | 582 dlist_foreach(iter, bucket) 583 { 584 CatCTup *ct; 585 586 ct = dlist_container(CatCTup, cache_elem, iter.cur); 587 Assert(ct->ct_magic == CT_MAGIC); 588 Assert(ct->refcount == 0); 589 Assert(!ct->dead); 590 } |
我们看看ct->refcount的解释:
int refcount; /* number of active references */
这其实跟我修改的代码有关系,我们从上面堆栈信息分析。
ContinuousQueryWorkerMain-->CommitTransactionCommand
我在ContinuousQueryWorkerMain里面自己调用了
Bitmapset *targets = GetLocalStreamReaders(relid);
而我判断targets的时候,
1 2 3 | if (!targets) { donothing... } |
我特意看了一下官方的用法。
src/backend/pipeline/stream.c:200
1 2 3 4 5 6 7 8 | if (targets == NULL) { char *name = get_rel_name(pstmt->relid); ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg( "no continuous views are currently reading from stream %s" , name), errhint( "Use CREATE CONTINUOUS VIEW to create a continuous view that includes %s in its FROM clause." , name))); } |
很清楚的看到,这个里面直接丢了个ERROR,
直接abort,这样就不会像我上面堆栈信息那样,后面commit就会断言异常。
话说,一个stream下面没有readers是很正常的,但是这么明显是代码有错误,该释放的没有释放。
修改如下:
src/backend/catalog/pipeline_stream.c
GetAllStreamReaders函数
1 2 | if (isnull) return NULL; |
修改成
1 2 3 4 | if (isnull){ ReleaseSysCache(tup); return NULL; } |
这样在返回的时候就直接释放了tup。