缘起
近在公司基于bigQuery开发埋点数据分析功能,所以总结一下自己封装的bigQuery查询工具类(网上关于bigQuery的文章比较少)
关于bigQuery的概念功能可以参考 bigQuery 官方文档
在示例那包含了很多操作
示例中一段查询代码
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import java.util.UUID;
public class SimpleApp {
public static void main(String... args) throws Exception {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"SELECT commit, author, repo_name "
+ "FROM `bigquery-public-data.github_repos.commits` "
+ "WHERE subject like '%bigquery%' "
+ "ORDER BY subject DESC LIMIT 10")
// Use standard SQL syntax for queries.
// See: https://cloud.google.com/bigquery/sql-reference/
.setUseLegacySql(false)
.build();
// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
// Wait for the query to complete.
queryJob = queryJob.waitFor();
// Check for errors
if (queryJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
throw new RuntimeException(queryJob.getStatus().getError().toString());
}
// Get the results.
TableResult result = queryJob.getQueryResults();
// Print all pages of the results.
for (FieldValueList row : result.iterateAll()) {
// String type
String commit = row.get("commit").getStringValue();
// Record type
FieldValueList author = row.get("author").getRecordValue();
String name = author.get("name").getStringValue();
String email = author.get("email").getStringValue();
// String Repeated type
String repoName = row.get("repo_name").getRecordValue().get(0).getStringValue();
System.out.printf(
"Repo name: %s Author name: %s email: %s commit: %s\n", repoName, name, email, commit);
}
}
}
复制代码
以上这段查询的代码,给我的感觉就是步骤都是类似的,都是创建一个JOB,等待查询,处理查询的结果集。如果我每写一个查询都要写这一大坨,那简直要恶心死。
其实以上的查询代码除了SQL和结果集的处理不一样,其他的都是一样,基于这一点来封装一个 bigQuery 查询工具类,想要达到的效果就是:我给你一段SQL,你给我处理好的结果。
Maven 依赖
首先需要引入 Maven 依赖,当然这个在 bigQuery 官方示例上是有的
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>24.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
</dependency>
复制代码
设置身份验证
要运行客户端库,必须先设置身份验证,也就是说需要你的服务帐号密钥才能去连接和操作bigQuery,官方文档中也说到这点,他提供的方案是:设置环境变量 GOOGLE_APPLICATION_CREDENTIALS
向应用代码提供身份验证凭据
我觉得这种方式对于我们来说不太友好,我不能在服务器上设置环境变量,我们现在都是微服务,部署在k8s上,所以这种方案也不知道如何使用(会这种方式的小伙伴一定要告诉我怎么使用)
所以我的做法是将 身份验证凭据 JSON 文件加在项目 resource 里,通过流的方式读取凭据。
@Value(value = "classpath:netpop-e792a-data-analytics.json")
private Resource dataAnalyticsResource;
复制代码
配置 BigQuery Bean
由上面那段查询案例可知,重要的一个 Bean 就是 BigQuery,所以把这个Bean 注册到IOC容器中。
@Configuration
public class BigQueryConfiguration {
// 加载 身份验证凭据
@Value(value = "classpath:netpop-e792a-data-analytics.json")
private Resource dataAnalyticsResource;
// 配置核心Bean
@Bean
BigQuery bigQuery() throws IOException {
GoogleCredentials credentials = GoogleCredentials.fromStream(dataAnalyticsResource.getInputStream());
BigQuery bigquery = BigQueryOptions.newBuilder().setCredentials(credentials).build().getService();
return bigquery;
}
// 将bigQuery 分装工具类注册到IOC容器中
@Bean
BigQueryHelper bigQueryHelper(@Autowired BigQuery bigQuery) {
return new BigQueryHelper(bigQuery);
}
}
复制代码
工具类
package groot.data.analysis.support;
import com.google.cloud.bigquery.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.util.*;
/**
* @Classname BigQueryHelper
* @Description
* @Date 2021/9/2 17:43
* @Created by wangchangjiu
*/
@Slf4j
public class BigQueryHelper {
private BigQuery bigQuery;
public BigQueryHelper() {
}
public BigQueryHelper(BigQuery bigQuery) {
this.bigQuery = bigQuery;
}
/**
* 获取列表 返回类型的字段不支持复杂类型
*
* @param sql
* @param returnType
* @param <T>
* @return
* @throws InterruptedException
*/
public <T> List<T> queryForList(String sql, Class<T> returnType) throws InterruptedException {
TableResult result = execute(sql);
Map<String, Field> fieldMap = getStringFieldMap(result);
List<T> results = new ArrayList<>();
result.iterateAll().forEach(row -> {
T returnObj;
try {
returnObj = returnType.getDeclaredConstructor().newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException ex) {
throw new RuntimeException("reflect create object error :", ex);
}
ReflectionUtils.doWithFields(returnType, field -> {
Field bigQueryField = fieldMap.get(field.getName());
if (bigQueryField != null) {
FieldValue fieldValue = row.get(bigQueryField.getName());
if (bigQueryField.getType().getStandardType() == StandardSQLTypeName.STRUCT) {
throw new UnsupportedOperationException("unsupported returnType field include complex *");
}
field.setAccessible(true);
ReflectionUtils.setField(field, returnObj, resultWrapper(fieldValue, field.getType()));
}
});
results.add(returnObj);
});
return results;
}
/**
* 字段名和字段映射
* @param result
* @return
*/
private Map<String, Field> getStringFieldMap(TableResult result) {
FieldList fieldList = result.getSchema().getFields();
Map<String, Field> fieldMap = new HashMap<>(fieldList.size());
for (int i = 0; i < fieldList.size(); i++) {
Field field = fieldList.get(i);
fieldMap.put(field.getName(), field);
}
return fieldMap;
}
/**
* 执行SQL 获取结果集
* @param sql
* @return
* @throws InterruptedException
*/
private TableResult execute(String sql) throws InterruptedException {
Assert.notNull(sql, "SQL must not be null");
QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(sql).setUseLegacySql(false).build();
// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
// Wait for the query to complete.
queryJob = queryJob.waitFor();
if (queryJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
throw new RuntimeException(queryJob.getStatus().getError().toString());
}
// Get the results.
return queryJob.getQueryResults();
}
/**
* 查询列表,实现 ResultSetExtractor 接口 自定义提取数据
* @param sql
* @param rse
* @param <T>
* @return
* @throws InterruptedException
*/
public <T> List<T> queryForList(String sql, ResultSetExtractor<T> rse) throws InterruptedException {
TableResult tableResult = execute(sql);
List<T> results = new ArrayList<>();
tableResult.iterateAll().forEach(row -> results.add(rse.extractData(row)));
return results;
}
/**
* 查询返回单个结果集
* @param sql
* @param returnType
* @param <T>
* @return
* @throws InterruptedException
*/
public <T> T queryForSingleResult(String sql, Class<T> returnType) throws InterruptedException {
TableResult tableResult = execute(sql);
if (tableResult.iterateAll().iterator().hasNext()) {
// 只有一行
FieldValueList fieldValues = tableResult.iterateAll().iterator().next();
if (isBasicType(returnType)) {
return (T) resultWrapper(fieldValues.get(0), returnType);
} else {
T returnObj;
try {
returnObj = returnType.getDeclaredConstructor().newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException ex) {
throw new RuntimeException("reflect create object error :", ex);
}
Map<String, Field> fieldMap = getStringFieldMap(tableResult);
ReflectionUtils.doWithFields(returnType, field -> {
Field bigQueryField = fieldMap.get(field.getName());
if (bigQueryField != null) {
FieldValue fieldValue = fieldValues.get(bigQueryField.getName());
if (bigQueryField.getType().getStandardType() == StandardSQLTypeName.STRUCT) {
throw new UnsupportedOperationException("unsupported returnType field include complex *");
}
field.setAccessible(true);
ReflectionUtils.setField(field, returnObj, resultWrapper(fieldValue, field.getType()));
}
});
return returnObj;
}
}
return null;
}
/**
* 结果类型处理
* @param fieldValue
* @param returnType
* @return
*/
private Object resultWrapper(FieldValue fieldValue, Class returnType) {
if (returnType == Boolean.class || returnType == boolean.class) {
return fieldValue.getBooleanValue();
} else if (returnType == Long.class || returnType == long.class) {
return fieldValue.getLongValue();
} else if (returnType == Double.class || returnType == double.class) {
return fieldValue.getDoubleValue();
} else if (returnType == BigDecimal.class) {
return fieldValue.getNumericValue();
} else if (returnType == String.class) {
return fieldValue.getStringValue();
}
return fieldValue.getValue();
}
/**
* 判断是否是简单类型
* @param returnType
* @param <T>
* @return
*/
private <T> boolean isBasicType(Class<T> returnType) {
return returnType == String.class || returnType.isPrimitive()
|| returnType == Boolean.class || returnType == Byte.class
|| returnType == Integer.class || returnType == Long.class
|| returnType == Double.class || returnType == Short.class
|| returnType == Float.class || returnType == BigDecimal.class;
}
}
复制代码
这里对外主要提供了
// 获取列表 返回类型的字段不支持复杂类型
public <T> List<T> queryForList(String sql, Class<T> returnType) throws InterruptedException
// 查询列表,实现 ResultSetExtractor 接口 自定义提取数据
public <T> List<T> queryForList(String sql, ResultSetExtractor<T> rse) throws InterruptedException
// 查询返回单个结果集
public <T> T queryForSingleResult(String sql, Class<T> returnType) throws InterruptedException
复制代码
我这里主要的思想就是利用反射创建目标对象,将字段赋值进去
当然这里不支持返回类型是对象嵌套对象的形式,原因是那种比较复杂,而且我现在这里也没有这种场景。
还有就是这里没有支持分页等其他操作
使用工具类
使用的话就是注入 bigQueryHelper 工具类
以上就是简单的一个bigQuery 分装类,当然还可以进一步优化封装的更全一点。
作者:一个没有追求的技术人
链接:https://juejin.cn/post/7036337598025072653