def change_timedelta64_type(zeek_df):
# 时间间隔类型数据转浮点数
for column in zeek_df.columns:
if zeek_df[column].d*=='timedelta64[ns]':
lie = []
for onedel in zeek_df[column]:
onedel_new = onedel.total_seconds()
lie.append(onedel_new)
zeek_df[column] = lie
2、生成建表命令[字段对应格式]
def get_cmd(log_type,keys_str):
keys = re.findall("'(.*?)'",keys_str)
keys.pop()
kv=[]
for ke in keys:
if ke in ["object"]:
continue
if ke=='version':
form = "BIGINT" if log_type in ['ntp'] else "VARCHAR"
elif ke=='mail_from':
form = "TIMESTAMP" if log_type in ['kerberos'] else "VARCHAR"
elif ke in [
'compile_ts',# 'ts_str','ts',
'ref_time','org_time','rec_time','xmt_time', # ntp
'till', # kerberos,
'certificate_not_valid_before','certificate_not_valid_after', # x509
'times_modified','times_accessed','times_created','times_changed', # smb_files
]:
# time
form = "TIMESTAMP"
elif ke in [
'year','month','day','id_orig_p','id_resp_p','file_size','reply_code','data_channel_resp_p', # ftp
'dcc_file_size', # irc
'mode','stratum','num_exts', # ntp
'trans_id','qclass','qtype','rcode','Z', # dns
'trans_depth', # smtp
'depth','seen_bytes','total_bytes','missing_bytes','overflow_bytes','extracted_size', # files
'orig_bytes','resp_bytes','missed_bytes','orig_pkts','orig_ip_bytes','resp_pkts','resp_ip_bytes', # conn
'certificate_version','certificate_key_length','basic_constraints_path_len', # x509
'version','auth_attempts', # ssh
'trans_depth','request_body_len','response_body_len','status_code','info_code', # http
'p','n', # notice
'size', # smb_files
]:
# port,count
form = "BIGINT"
elif ke in [
# "data_channel_passive", # ftp
# "success", # ntml、packet_filter、kerberos
# "is_exe","is_64bit","uses_aslr","uses_dep","uses_code_integrity","uses_seh",
# "has_import_table","has_export_table","has_cert_table","has_debug_data",# pe
# "rejected","AA","TC","RD","RA", # dns
# "init", # packet_filter
# "tls", # smtp
# "forwardable","renewable", # kerberos
# "local_orig","is_orig","timedout","extracted_cutoff", # files
# "local_orig","local_resp",# conn
# "notice", # weird
# "basic_constraints_ca", # x509
# "auth_success", # ssh
# "resumed","established", # ssl
]:
# bool
form = "BOOLEAN"
elif ke in [
'poll','precision','root_delay','root_disp', # ntp
'rtt', # dns、sce_rpc
'duration', # files、dhcp
'lease_time', # dhcp
'suppress_for','remote_location_latitude','remote_location_longitude', # notice
]:
# INTERVAL(FLOAT)
form = "DOUBLE"
else:
form = "VARCHAR"
kv.append("""
{} {}""".format(ke,form))
out_cmd = """
CREATE TABLE datalake.zeeks.{} ({}
)
WITH (
format='Parquet',
partitioned_by = ARRAY['{}','{}','{}'],
external_location = 's3://datalake/zeek/{}'
);
CALL system.sync_partition_metadata('zeeks','{}','FULL');
SELECT * FROM datalake.zeeks.{};
""".format(log_type,",".join(kv),keys[-3],keys[-2],keys[-1],log_type,log_type,log_type)
return out_cmd
3、 trino 命令行执行生成的命令(schemaname,table_name,model)
CREATE TABLE datalake.zeeks.ftp (
uid VARCHAR,
id_orig_h VARCHAR,
id_orig_p BIGINT,
id_resp_h VARCHAR,
id_resp_p BIGINT,
user VARCHAR,
password VARCHAR,
command VARCHAR,
arg VARCHAR,
mime_type VARCHAR,
file_size BIGINT,
reply_code BIGINT,
reply_msg VARCHAR,
data_channel_passive VARCHAR,
data_channel_orig_h VARCHAR,
data_channel_resp_h VARCHAR,
data_channel_resp_p BIGINT,
fuid VARCHAR,
ts_str TIMESTAMP,
year BIGINT,
month BIGINT,
day BIGINT
)
WITH (
format='Parquet',
partitioned_by = ARRAY['year','month','day'],
external_location = 's3://datalake/zeek/ftp'
);
CALL system.sync_partition_metadata('zeeks','ftp','FULL');
SELECT * FROM datalake.zeeks.ftp;
原文链接:https://zhuanlan.zhihu.com/p/366245621