绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
trino新建带分区的外部表(trino命令)
2022-05-12 15:29:50

1、原数据上传 minio 前 timedelta ( interval )类型数据处理

def change_timedelta64_type(zeek_df):
    # 时间间隔类型数据转浮点数
    for column in zeek_df.columns:
        if zeek_df[column].d*=='timedelta64[ns]':
            lie = []
            for onedel in zeek_df[column]:
                onedel_new = onedel.total_seconds()
                lie.append(onedel_new)
            zeek_df[column] = lie

2、生成建表命令[字段对应格式]

def get_cmd(log_type,keys_str):
    keys = re.findall("'(.*?)'",keys_str)
    keys.pop()
    kv=[]
    for ke in keys:
        if ke in ["object"]:
            continue
        if ke=='version':
            form = "BIGINT" if log_type in ['ntp'] else "VARCHAR"
        elif ke=='mail_from':
            form = "TIMESTAMP" if log_type in ['kerberos'] else "VARCHAR"
        elif ke in [
            'compile_ts',# 'ts_str','ts',
            'ref_time','org_time','rec_time','xmt_time', # ntp
            'till', # kerberos,
            'certificate_not_valid_before','certificate_not_valid_after', # x509
            'times_modified','times_accessed','times_created','times_changed', # smb_files
            ]:
            # time
            form = "TIMESTAMP"
        elif ke in [
            'year','month','day','id_orig_p','id_resp_p','file_size','reply_code','data_channel_resp_p', # ftp
            'dcc_file_size', # irc
            'mode','stratum','num_exts', # ntp
            'trans_id','qclass','qtype','rcode','Z', # dns
            'trans_depth', # smtp
            'depth','seen_bytes','total_bytes','missing_bytes','overflow_bytes','extracted_size', # files
            'orig_bytes','resp_bytes','missed_bytes','orig_pkts','orig_ip_bytes','resp_pkts','resp_ip_bytes', # conn
            'certificate_version','certificate_key_length','basic_constraints_path_len', # x509
            'version','auth_attempts', # ssh
            'trans_depth','request_body_len','response_body_len','status_code','info_code', # http
            'p','n', # notice
            'size', # smb_files
        ]:
            # port,count
            form = "BIGINT"
        elif ke in [
#             "data_channel_passive", # ftp 
#             "success", # ntml、packet_filter、kerberos
#             "is_exe","is_64bit","uses_aslr","uses_dep","uses_code_integrity","uses_seh",
#             "has_import_table","has_export_table","has_cert_table","has_debug_data",# pe
#             "rejected","AA","TC","RD","RA", # dns
#             "init", # packet_filter
#             "tls", # smtp
#             "forwardable","renewable", # kerberos
#             "local_orig","is_orig","timedout","extracted_cutoff", # files
#             "local_orig","local_resp",# conn
#             "notice", # weird
#             "basic_constraints_ca", # x509
#             "auth_success", # ssh
#             "resumed","established", # ssl
        ]:
            # bool
            form = "BOOLEAN"
        elif ke in [
            'poll','precision','root_delay','root_disp', # ntp
            'rtt', # dns、sce_rpc
            'duration', # files、dhcp
            'lease_time', # dhcp
            'suppress_for','remote_location_latitude','remote_location_longitude', # notice
            ]:
            # INTERVAL(FLOAT)
            form = "DOUBLE"
        else:
            form = "VARCHAR"
        kv.append("""
        {} {}""".format(ke,form))
    out_cmd = """
CREATE TABLE datalake.zeeks.{} ({}
          ) 
          WITH (
          format='Parquet',
          partitioned_by = ARRAY['{}','{}','{}'],
          external_location = 's3://datalake/zeek/{}'
          );
CALL system.sync_partition_metadata('zeeks','{}','FULL');
SELECT * FROM datalake.zeeks.{};
    """.format(log_type,",".join(kv),keys[-3],keys[-2],keys[-1],log_type,log_type,log_type)
    return out_cmd

3、 trino 命令行执行生成的命令(schemaname,table_name,model)

CREATE TABLE datalake.zeeks.ftp (
        uid VARCHAR,
        id_orig_h VARCHAR,
        id_orig_p BIGINT,
        id_resp_h VARCHAR,
        id_resp_p BIGINT,
        user VARCHAR,
        password VARCHAR,
        command VARCHAR,
        arg VARCHAR,
        mime_type VARCHAR,
        file_size BIGINT,
        reply_code BIGINT,
        reply_msg VARCHAR,
        data_channel_passive VARCHAR,
        data_channel_orig_h VARCHAR,
        data_channel_resp_h VARCHAR,
        data_channel_resp_p BIGINT,
        fuid VARCHAR,
        ts_str TIMESTAMP,
        year BIGINT,
        month BIGINT,
        day BIGINT
          ) 
          WITH (
          format='Parquet',
          partitioned_by = ARRAY['year','month','day'],
          external_location = 's3://datalake/zeek/ftp'
          );
CALL system.sync_partition_metadata('zeeks','ftp','FULL');
SELECT * FROM datalake.zeeks.ftp;
原文链接:https://zhuanlan.zhihu.com/p/366245621
分享好友

分享这个小栈给你的朋友们,一起进步吧。

Trino
创建时间:2022-04-12 14:37:38
Trino
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 飘絮絮絮丶
    专家
戳我,来吐槽~