绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
聚宽数据(JQData)本地化解决方案:基于MongoDB(下)
2020-05-22 10:59:39

(接上篇)

我们进一步对BaseModel进行封装,使其更适合数据分析人员,尤其是那些对非计算机专业的编程人员,因为他们希望看到的数据就是一张表,类似于Excel,而不是一堆字典或数组。我们再创建一个文件modeldata.py:

import datetime
import pandas as pd
import numpy as np

from ..db import  BaseModel
#from Calf.exception import MongoIOError, FileError, ExceptionInfo, \
#   WarningMessage, SuccessMessage  # 这都是一些关于异常处理的自定义方法,可以先不管,代码中报错的可以先注释掉


class ModelData(object):
    """
    有关公共模型所有的IO(数据库)将通过这个类实现.
    通用的IO方法
    """

    def __init__(self, location=None, dbname=None):
        self.location = location
        self.dbname = dbname
        pass

    # @classmethod
    def field(self, table_name, field_name, filter=None):
        """
        Query the value of a field in the database
        :param filter:
        :param table_name: the database's table name
        :param field_name: the table's field name
        :return: all values in database
        """
        try:
            return BaseModel(table_name, self.location,
                             self.dbname).distinct(field_name, filter)
        except Exception:
            raise MongoIOError('query the field raise a error')

    # @classmethod
    def max(self, table_name, field='_id', **kw):
        """
        找到满足kw条件的field列上的大值
        :param table_name:
        :param field:
        :param kw:
        :return:
        """
        try:
            if not isinstance(field, str):
                raise TypeError('field must be an instance of str')
            cursor = BaseModel(table_name, self.location,
                               self.dbname).query(sql=kw, field={field: True})
            if cursor.count():
                d = pd.DataFrame(list(cursor))
                m = d.loc[:, [field]].max()[field]
            else:
                m = None
            cursor.close()
            return m
        except Exception as e:
            raise e

    # @classmethod
    def min(self, table_name, field='_id', **kw):
        """
        找到满足kw条件的field列上的小值
        :param table_name:
        :param field:
        :param kw:
        :return:
        """
        try:
            if not isinstance(field, str):
                raise TypeError('field must be an instance of str')
            cursor = BaseModel(table_name, self.location,
                               self.dbname).query(sql=kw, field={field: True})
            if cursor.count():
                d = pd.DataFrame(list(cursor))
                m = d.loc[:, [field]].min()[field]
            else:
                m = None
            cursor.close()
            return m
        except Exception as e:
            raise e

    # @classmethod
    def insert_data(self, table_name, data):
        """
        一个简易的数据插入接口
        :param table_name:
        :param data:
        :return:
        """
        try:
            if len(data):
                d = data.to_dict(orient='records')
                BaseModel(table_name, self.location,
                          self.dbname).insert_batch(d)
        except Exception:
            raise MongoIOError('Failed with insert data by MongoDB')

    def insert_one(self, table_name, data):
        """
        insert one record
        :param table_name:
        :param data: a dict
        :return:
        """
        try:
            BaseModel(table_name, self.location,
                      self.dbname).insert(data)
        except Exception:
            raise MongoIOError('Failed with insert data by MongoDB')

    def read_one(self, table_name, field=None, **kw):
        """
        有时候只需要读一条数据,没必要使用read_data,
        :param table_name:
        :param field:
        :param kw:
        :return: a dict or None
        """
        try:
            cursor = BaseModel(table_name, self.location,
                               self.dbname).query_one(kw, field)
        except Exception as e:
            ExceptionInfo(e)
        finally:
            return cursor

    # @classmethod
    def read_data(self, table_name, field=None, **kw):
        """
        一个简易的数据读取接口
        :param table_name:
        :param field:
        :param kw:
        :return:
        """
        try:
            cursor = BaseModel(table_name, self.location,
                               self.dbname).query(kw, field)
            data = pd.DataFrame()
            if cursor.count():
                data = pd.DataFrame(list(cursor))
        except Exception as e:
            ExceptionInfo(e)
        finally:
            cursor.close()
            return data

    def aggregate(self, table_name, pipeline):
        """

        :param table_name:
        :param pipeline:
        :return: 
        """
        try:
            cursor = BaseModel(table_name, self.location,
                               self.dbname).aggregate(pipeline)
            # data = pd.DataFrame()
            # if cursor.count():
            data = pd.DataFrame(list(cursor))

        except Exception as e:
            ExceptionInfo(e)

        finally:
            cursor.close()
            return data

    # @classmethod
    def update_data(self, table_name, condition, **kw):
        """
        按condition条件更新table_name表数据
        :param table_name:
        :param condition: 形如{‘date':datetime.datetime(2018,1,1)}的一个字典
        :param kw:形如close=0这样的参数组
        :return:
        """
        try:
            r = BaseModel(table_name, self.location,
                          self.dbname).update_batch(condition, kw)
            return r
        except Exception:
            raise MongoIOError('Failed with update by MongoDB')

    # @classmethod
    def remove_data(self, table_name, **kw):
        """
        删除数据
        :param table_name:
        :param kw:
        :return:
        """
        try:
            r = BaseModel(table_name, self.location,
                          self.dbname).remove(kw)
            return r
        except Exception:
            raise MongoIOError('Failed with delete data by MongoDB')

有了ModelData类,数据库中的数据对我们来说全都是DataFrame了,对于数据分析人员来说,再好不过了。但如果你对pandas还不是很熟悉,老兄,对不起,你是时候改变了。

下面我们就利用聚宽的本地API接口把K线数据下载下来存在我们本地吧。

import pandas as pd
import jqdatasdk as jq


jq.auth('***', '***')
data = jq.get_all_securities(*=[], date=None)
for i, r in data.iterrows():
    d = jq.get_price(r.code, start_date=dt.datetime(2018, 1, 1), end_date=dt.datetime(2018, 10, 30),
                            frequency='daily', fields=None, skip_paused=False, fq='pre', count=None)
    # 这里,其实你还可以对d进行一些调整
    # 将d插入数据库中,md()括号中不传参数,就会插入到默认的那个数据库中==>"default"
    md().insert_data(table_name='kline_day', data=d)
    # 当你要把d插入到另一个数据库中,则需要在md()中传入参数,像这样:
    # md(location='server1').insert_data(table_name='kline_day', data=d) # 'server1'我们在数据库配置那个json中见过的

分享好友

分享这个小栈给你的朋友们,一起进步吧。

MongoDB资料专区
创建时间:2020-05-08 13:54:47
MongoDB是一个介于关系数据库和非关系数据库之间的产品。MongoDB是一个基于分布式文件存储 [1] 的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 小雨滴
    专家
戳我,来吐槽~