背景知识
为了更好说明neo4j集群中的读写及其一致性行为,有必要先对neo4j的部署模式,驱动,会话,事务等概念做下基本介绍。
neo4j部署模式
neo4j有3种部署模式,分别叫stand-alone instance(单机模式),HA cluster(高可用模式)和causal cluster(因果集群模式,本文简称为集群模式)。单机模式用于neo4j开源版本,后2种模式仅在neo4j企业版能够使用,高可用模式在neo4j早期版本上使用,新的neo4j企业版采用更加强大的集群模式。通过neo4j的配置项dbms.mode可以设置节点的角色:
- 单机模式:节点角色为SINGLE
- 高可用模式:节点角色可为HA或ARBITER,ARBITER表示没有数据的节点;
- 集群模式:节点角色可为CORE或READ_REPLICA。
集群模式
causal cluster集群的示意图如下所示:
图中间是由3个CORE节点构成的核心读写集群,使用raft协议进行数据一致性保障,会自动选择一个节点作为LEADER提供读写服务,其余节点作为FOLLOWER节点复制LEADER推送的raft日志;如果LEADER节点由于网络隔离或crash而无法正常服务,FOLLOWER节点会进行重新选择一个新的LEADER节点。为了提高集群稳定性,减少不必要的选主操作,选主会分为2个阶段:预选和正式选举。如果在预选时没有得到足够多的节点回复,就不会发起正式选举(与MongoDB类似)。
外围一圈为只读集群,节点角色均为READ_REPLICA,提供查询、报表和分析等只读服务。只读节点异步复制CORE节点的事务日志进行数据catchup。
BOLT驱动
neo4j提供了强大的客户端驱动,能够自动处理LEADER节点变更,基于负载进行请求路由,用户还可以根据业务需求来自定义路由策略,非常灵活。
BOLT驱动有2种使用模式,分别是bolt和bolt+routing。前者一般用于单机部署模式,或者想直连集群下的某个特定节点;后者是连接到整个集群上。连接URI如下所示:
jdbc:neo4j:bolt://host:port/?username=neo4j,password=xxxx
jdbc:neo4j:bolt+routing://host1:port1,host2:port2,…,hostN:portN/?username=neo4j,password=xxxx
host1:port1,host2:port2,…,hostN:portN为需要连接的集群的成员列表,可包括CORE和READ_REPLICA节点。 在集群模式下,建议使用bolt+routing连接方式。通过自动切换到新的LEADER节点来提高业务的服务可用性,通过将请求路由到不同的节点实现集群的负载均衡。需要注意的是,如果没有设置读写模式(Access Mode)为read,请求会默认路由到CORE节点。 bolt+routing另一个强大的功能是支持自定义policy,也就是说业务端可以自定义路由规则,可以做到某个请求发送到集群的特定节点上执行。在此不展开描述,可点击了解更多信息。
会话与事务
在neo4j中,会话(Session)可以理解为连接,在会话中执行一个或多个事务。会话从驱动的连接池中获取连接,在关闭时将连接返给连接池。 neo4j中的事务有3种表现形式:
- 自动提交事务
- 事务函数
- 显式事务
自动提交事务(Auto-commit Transactions)
与MySQL等关系型数据库类似,该类型事务仅有一条cypher语句组成,无法自动重试。直接执行方式是session.run(),没有创建事务对象。如下所示:
public void addPerson( String name )
{
try ( Session session = driver.session() )
{
session.run( "CREATE (a:Person {name: $name})", parameters( "name", name ) );
}
}
neo4j不推荐在生产环境中使用自动提交事务,理由是非性能佳的事务执行方式,主要是从网络数据包交互角度出发的。笔者认为,自动提交事务还是有特定的应用场景的。
事务函数
这是neo4j特有的事务形式,形如:
public void addPerson( final String name )
{
try ( Session session = driver.session() )
{
session.writeTransaction( new TransactionWork<Integer>()
{
@Override
public Integer execute( Transaction tx )
{
return createPersonNode( tx, name );
}
} );
}
}
private static int createPersonNode( Transaction tx, String name )
{
tx.run( "CREATE (a:Person {name: $name})", parameters( "name", name ) );
return 1;
}
在函数中创建session,然后基于该session产生一个事务对象,再重载事务的execute方法,通过调用Transaction.run方式来执行cypher语句。
这是neo4j推荐的事务执行方式,用尽可能少的框架代码来将数据库事务和其他业务逻辑相隔离,具备事务重试能力。
显式事务
显然,该事务类型就是显式调用BEGIN、COMMIT/ROLLBACK来开始、提交或回滚事务。具体例子在下节给出。
读写模式(Access Mode)
类似于MySQL中的只读事务和读写事务。但不同的是,neo4j中可以在会话/连接级别进行设置。
try (Connection connection = DriverManager.getConnection(connectionUrl, "neo4j", password)) {
connection.setAutoCommit(false);
// Access to CORE nodes, as the connection is opened by default in write mode
try (Statement statement = connection.createStatement()) {
statement.execute("create (:Neo4JTest { protocol: 'BOLT+ROUTING' })");
}
// closing transaction before changing access mode
connection.commit();
// Switching to read-only mode to access READ REPLICA nodes
connection.setReadOnly(true);
try (Statement statement = connection.createStatement()) {
try (ResultSet resultSet = statement.executeQuery("match (t:Neo4JTest) return count(t) as tot")) {
if (resultSet.next()) {
Long tot = resultSet.getLong("tot");
}
}
}
connection.commit();
}
上面的例子中涉及到了显式事务和读写设置。通过connection.setAutoCommit(false)开始了显式事务,connection.commit()进行事务提交。connection.setReadOnly(true)将事务设置为只读模式。 下面的例子是在事务函数中进行读写事务设置:
public long addPerson( final String name )
{
try ( Session session = driver.session() )
{
session.writeTransaction( new TransactionWork<Void>()
{
@Override
public Void execute( Transaction tx )
{
return createPersonNode( tx, name );
}
} );
return session.readTransaction( new TransactionWork<Long>()
{
@Override
public Long execute( Transaction tx )
{
return matchPersonNode( tx, name );
}
} );
}
}
private static Void createPersonNode( Transaction tx, String name )
{
tx.run( "CREATE (a:Person {name: $name})", parameters( "name", name ) );
return null;
}
private static long matchPersonNode( Transaction tx, String name )
{
StatementResult result = tx.run( "MATCH (a:Person {name: $name}) RETURN id(a)", parameters( "name", name ) );
return result.single().get( 0 ).asLong();
}
在addPerson的事务函数中,分别包括一个写和读事务:创建一条标签为Person的节点,其name属性为输入的name字符串;然后使用读事务获取其id。
因果关系链(Causal chaining)
所谓因果关系链,指的是因果一致性,即用户应该能够读取到其之前写入数据库的数据。对于单机模式的数据库,因果一致性总能保证的,只需要确保写入数据的事务已提交,使用当前读(与快照读相对应)。但在集群模式下,因果一致性就不是自然而然的事情了。比如,用户在LEADER节点插入了一个节点,是否可以在某一只读节点读到该边呢?由于读写操作跨了neo4j进程,而且只读节点与核心节点的复制是异步的,显然可能存在数据延迟情况。
那么neo4j的集群模式是否能够保证因果一致性呢,以及如何保证的呢?答案是肯定的。neo4j能够实现任意2个事务的因果一致性,即使事务执行时跨节点的。
When working with a Causal Cluster, transactions can be chained to ensure causal consistency. This means that for any two transactions, it is guaranteed that the second transaction will begin only after the first has been successfully committed. This is true even if the transactions are carried out on different physical cluster members.
neo4j中的因果关系链包括2个层面,在一个会话中的多个事务,neo4j自动保证了因果一致性,无需用户额外编程保障。因此,如果用户执行多个相互间没有关联的事务,可以将这些事务放在不同的会话中以减小不必要的延迟;对于会话间的事务,需要在代码中增加对应的处理。
基于书签(bookmarks)的因果一致性实现
在这里,书签是什么意思呢,从字面理解,书签应该是标记了书中的某个位置。对应到数据库中,书签的意思有点像MySQL中的gtid_executed,即下图的Executed_Gtid_Set。
MySQL [performance_schema]> show master status\G
*************************** 1. row ***************************
File: mysql-bin.000117
Position: 422155199
Binlog_Do_DB:
Binlog_Ignore_DB:
Executed_Gtid_Set: 03a6a5c8-7ebc-11e8-ba8e-fa163e132314:1-208233098
1 row in set (0.00 sec)
MySQL [performance_schema]>
在MySQL中,每个事务有个gtid,由server_uuid和transaction_id组成,形式上为server_uuid:transaction_id,本例中server_uuid为03a6a5c8-7ebc-11e8-ba8e-fa163e132314,该mysql实例已经执行的transaction_id从1开始连续到208233098。可以用gtid_executed来确定一个事务在该MySQL实例中的次序。显然,如果A的事务执行后对应的gtid_executed为03a6a5c8-7ebc-11e8-ba8e-fa163e132314:1-100,那么后续执行的B事务,如果想读取到A事务提交的数据,那么只需要等待执行B事务对应的MySQL实例的Executed_Gtid_Set是03a6a5c8-7ebc-11e8-ba8e-fa163e132314:1-100的超集就行了。 在neo4j中,书签的意思正式如此,下面用具体的例子来进行说明。我们先看一个图:
该图比较直观得说明了书签在会话中和会话间的作用方式。在图中3个会话中,每个会话都有2个事务,书签在事务间自动传递。而如果要将会话A和B的书签传递给C,那么就需要在代码中显式获取并作为输入参数传递给C。下面的例子与上图相对应:
// Create a company node
private StatementResult addCompany( final Transaction tx, final String name )
{
return tx.run( "CREATE (:Company {name: $name})", parameters( "name", name ) );
}
// Create a person node
private StatementResult addPerson( final Transaction tx, final String name )
{
return tx.run( "CREATE (:Person {name: $name})", parameters( "name", name ) );
}
// Create an employment relationship to a pre-existing company node.
// This relies on the person first having been created.
private StatementResult employ( final Transaction tx, final String person, final String company )
{
return tx.run( "MATCH (person:Person {name: $person_name}) " +
"MATCH (company:Company {name: $company_name}) " +
"CREATE (person)-[:WORKS_FOR]->(company)",
parameters( "person_name", person, "company_name", company ) );
}
// Create a friendship between two people.
private StatementResult makeFriends( final Transaction tx, final String person1, final String person2 )
{
return tx.run( "MATCH (a:Person {name: $person_1}) " +
"MATCH (b:Person {name: $person_2}) " +
"MERGE (a)-[:KNOWS]->(b)",
parameters( "person_1", person1, "person_2", person2 ) );
}
// Match and display all friendships.
private StatementResult printFriends( final Transaction tx )
{
StatementResult result = tx.run( "MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name" );
while ( result.hasNext() )
{
Record record = result.next();
System.out.println( String.format( "%s knows %s", record.get( "a.name" ).asString(), record.get( "b.name" ).toString() ) );
}
return result;
}
public void addEmployAndMakeFriends()
{
// To collect the session bookmarks
List<String> savedBookmarks = new ArrayList<>();
// Create the first person and employment relationship.
try ( Session session1 = driver.session( AccessMode.WRITE ) )
{
session1.writeTransaction( tx -> addCompany( tx, "Wayne Enterprises" ) );
session1.writeTransaction( tx -> addPerson( tx, "Alice" ) );
session1.writeTransaction( tx -> employ( tx, "Alice", "Wayne Enterprises" ) );
savedBookmarks.add( session1.lastBookmark() );
}
// Create the second person and employment relationship.
try ( Session session2 = driver.session( AccessMode.WRITE ) )
{
session2.writeTransaction( tx -> addCompany( tx, "LexCorp" ) );
session2.writeTransaction( tx -> addPerson( tx, "Bob" ) );
session2.writeTransaction( tx -> employ( tx, "Bob", "LexCorp" ) );
savedBookmarks.add( session2.lastBookmark() );
}
// Create a friendship between the two people created above.
try ( Session session3 = driver.session( AccessMode.WRITE, savedBookmarks ) )
{
session3.writeTransaction( tx -> makeFriends( tx, "Alice", "Bob" ) );
session3.readTransaction( this::printFriends );
}
}
该例子仍使用事务函数。主函数为addEmployAndMakeFriends(),在函数中创建了3个session,分别为session1,session2和session3,3个会话均设置读写模式为AccessMode.WRITE。session1和session2分别通过3个事务建立一对员工和公司的雇佣关系(Alice受雇于Wayne Enterprises,Bob受雇于LexCorp);可以看到在创建员工和公司的事务后,建立雇佣关系的事务没有书签相关代码,这是因为在会话内部自动传递了事务书签,在建立雇佣关系时会自动等到对应neo4j节点执行完员工和公司2个事务后才开始。 而在另一个会话session3对session1和session2创建的Alice和Bob建立朋友关系时,处理就不一样了。这是由于跨了会话,事务书签无法自动传递,需要在session3会话产生是显式传入所需的书签参数,即下例中的savedBookmarks:
Session session3 = driver.session( AccessMode.WRITE, savedBookmarks )
savedBookmarks保存了session1和session2的3个事务执行后的书签:
savedBookmarks.add( session1.lastBookmark() );
savedBookmarks.add( session2.lastBookmark() );
因此,可以确保session3在创建2个人的朋友关系前会等待session1和session2中的事务均已在对应的neo4j节点上成功提交。
小结
本文简要介绍了neo4j集群中与读写相关的事务,会话和用于请求路由的驱动等基础,在此基础上提出了有依赖关系的事务之间因果一致性问题,后分析neo4j集群如何在跨物理节点的场景下实现因果一致性。并用实例进行了说明。