绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
聚宽数据(JQData)本地化解决方案:基于MongoDB(上)
2020-05-22 10:59:10
——本篇文章 by 。大咖


MongoDB是由C++语言编写的一个基于分布式文件存储的开源NoSQL数据库系统,它提供了可扩展的高性能数据存储解决方案。

由于MongoDB是一个面向文档存储的数据库,所以操作起来比较简单和容易。MongoDB支持丰富的查询表达式,高性能的插入与查询操作,并在负载调节方面提供了较好的支持。MongoDB的这些特性很好地适应了量化策略开发项目对证券数据存储的需求。本文利用聚宽平台的本地化API获取证券数据,并以K线数据为例,演示了如何将获取到的K线数据存储在本地。

我们首先建立一个访问数据库的配置文件(这是一个json文件),这是为了方便在项目中同时访问多个数据源。

{
"default": {
// 这是一个常规的配置方案
// 把这个配置命名为"default"是有专门考量的,后文会提到这个问题
    "host": "127.0.0.1:57017",
    "dbname": "jq",
    "username": "user1",
    "password": "12345",
    "dbauth": "admin"
  },
"server1": {
// 当你的数据库没有配置用户角色的时候
// 你只需要填写下面这两个参数即可
    "host": "127.0.0.1:57016",
    "dbname": "ttt"
  },
"server2": {
// 这实际上是配置了一个复制集
// 关于复制集的概念,就不展开了
    "host": "127.0.0.1",
    "replicaset": "test_rs"
  },
"shard_a": {
// 这是配置了一个分片服务器
// 看起来和配置一个常规的数据库没什么区别
    "host": "127.0.0.1:40000",
    "dbname": "ttt"
  }
}

创建好数据库的访问配置文件后,紧接着就是添加一个连接数据库的db.py文件了:

# -*- coding: utf-8 -*-

from pymongo import *  # MongoDB的连接库
import json

# 构建一个默认的MongoDB对象置于项目运行的内存当中,避免频繁
# 的创建MongoDB对象
config_file =  'E:/db_config.json' # 上面创建的那个json文件的路径
try:
    with open(config_file) as f:
        l = f.read()
        a = json.loads(l)
        db = a['default']  # 默认访问的数据库,上文说过
    uri += '/?connectTimeoutMS=2000'
    if 'replicaset' in fields:
        uri += ';replicaSet=%s' % db['replicaset']
    if 'dbauth' in fields:
        uri += ';authSource=%s' % db['dbauth']
    print('db-uri:', uri)
    connection = MongoClient(uri)
    if 'dbname' in fields:
        mongodb = connection[db['dbname']]
    else:
        mongodb = None
except Exception:
    raise Exception('connection MongoDB raise a error')


class MongoDB:
    """数据库对象"""
    connection_count = 0
    # 当你在程序中,需要访问另一个数据库,那你就需要再临时创建一个连接对象了
    # 为了避免连续访问相同的数据库对象,设置location、connection
    # 两个类属性,保留近一次访问的对象
    location = None
    connection = None
    db_name = None

    @classmethod
    def db_connection(cls, location, db_name=None):
        """
        连接到数据库
        :param db_name:
        :param location:
        :return:
        """
        try:
            if location == cls.location and cls.connection is not None:
                # 这个连接对象已经存在
                if db_name is not None:
                    return cls.connection[db_name]
                else:
                    # 正常情况下,cls.connection不为空的时候,cls.db_name
                    # 也不为空
                    return cls.connection[cls.db_name]
            else:
                with open(config_file) as cf:
                    buffer = cf.read()
                    jn = json.loads(buffer)
                    db_ = jn[location]
                fields = db_.keys()
                uri = 'mongodb://'
                if 'username' in fields and 'password' in fields:
                    uri += '{username}:{password}@'.format(username=db_['username'],
                                                           password=db_['password'])
                # ip是必须的
                uri += db_['host']
                # if 'port' in fields:
                #     uri += ':%s' % db_['port']
                uri += '/?connectTimeoutMS=2000'
                if 'replicaset' in fields:
                    uri += ';replicaSet=%s' % db_['replicaset']
                if 'dbauth' in fields:
                    uri += ';authSource=%s' % db_['dbauth']

                print('new db-uri:', uri)
                connection = MongoClient(uri)
                tn = db_name if db_name is not None else db_['dbname']
                cls.location = location
                cls.connection = connection
                cls.db_name = tn
                return connection[tn]
        except Exception:
            raise Exception('connection MongoDB raise a error')

有了上面的步骤,我们就获得了一个即能访问单点服务器,有能访问复制集、分片服务器的连接对象了。下面就是访问数据库表的数据了,在这里我们创建一个base_model.py文件,它扮演的角色类似于controller,可以完成对数据库中所有表的映射,这其实更多的得益于NoSQL的特性。

# -*- coding: utf-8 -*-
import pymongo
from bson import ObjectId

from .db import MongoDB, ASCENDING, DESCENDING, mongodb, connection


class BaseModel(object):
    """
    _id 是 mongo 自带的,必须有这个字段
    其余 __fields__  的固定属性,未来会逐步添加
        classtype 是类名的小写
    """
    __fields__ = [
        '_id',
        # (字段名, 类型)
        # ('classtype', str),
    ]

    def __init__(self, tn=None, location=None, dbname=None):
        name = self.__class__.__name__
        self.tablename = tn.strip() if tn is not None and len(tn) else name.lower()
        if location is None and dbname is None:
            if mongodb is not None:
                self.mc = mongodb[self.tablename]
            else:
                raise Exception('Unable to find available dbname')
        elif location is None and dbname is not None:
            self.mc = connection[dbname][self.tablename]
        else:
            self.location = location
            self.dbname = dbname
            self.mc = MongoDB.db_connection(self.location, self.dbname)[self.tablename]
    def insert(self, *args, **kwargs):
        """
        插入一条数据
        """
        _ = kwargs if len(kwargs) else args[0]
        _['classtype'] = self.tablename
        # 去掉 _id 这个特殊的字段
        if '_id' in _:
            _['_id'] = ObjectId()

        m = self.mc.insert_one(_)
        return m

    def insert_batch(self, *args):
        """
        批量插入数据
        """
        _ = list()
        if len(args) == 1:
            _ = args[0]
            if isinstance(_, list):
                pass
            else:
                _ = [_]

        elif len(args) > 1:
            _ = args

        result = []
        for i in _:
            if '_id' in i:
                i['_id'] = ObjectId()
                # del i['_id']
            i['classtype'] = self.tablename.lower()
        try:
            if len(_):
                result = self.mc.insert_many(_)
        except pymongo.errors.BulkWriteError as e:
            if isinstance(_, list):
                r = _[0]
            else:
                r = _
            #log('insert_batch', self.tablename, r, msg=e.details['writeErrors'])

    def query(self, sql=None, field=None):
        """
        数据查询
        返回 list
        找不到则返回 []
        """
        # _ = kwargs if len(kwargs) else args[0] if len(args) else None
        ds = self.mc.find(sql, projection=field)
        return ds

    def aggregate(self, pipeline, allowDiskUse=True):
        """
        聚合函数
        :param pipeline: list 聚合表达式
        :param allowDiskUse: 运行使用磁盘来处理超过100M的数据
        :return: 
        """
        return self.mc.aggregate(pipeline, allowDiskUse=allowDiskUse)

    def query_one(self, sql=None, field=None):
        """
        查找并返回个元素
        找不到就返回 None
        """
        # _ = kwargs if len(kwargs) else args[0]
        l = self.mc.find_one(sql, projection=field)
        return l

    def update(self, cond, form):
        """
        """
        self.mc.find_one_and_update(cond, {"<span class="MathJax_Preview" style="color: inherit;"></span><span class="MathJax" id="MathJax-Element-1-Frame" tabindex="0" data-mathml="<math xmlns=&quot;http://www.w3.org/1998/Math/MathML&quot;><merror><mtext>set&amp;quot;:&amp;#xA0;form},&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;upsert=False)&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;def&amp;#xA0;update_batch(self,&amp;#xA0;condition,&amp;#xA0;form):&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;quot;&amp;quot;&amp;quot;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#x6279;&amp;#x91CF;&amp;#x66F4;&amp;#x65B0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;:param&amp;#xA0;condition:&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;:param&amp;#xA0;form:&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;:return:&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;quot;&amp;quot;&amp;quot;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;&amp;#xA0;return&amp;#xA0;self.mc.update_many(condition,&amp;#xA0;{&amp;quot;</mtext></merror></math>" style=""><span class="math" id="MathJax-Span-1" aria-hidden="true" style="vertical-align: 0.231em;"><span class="noError" id="MathJax-Span-2" style="display: inline-block;">set":&nbsp;form},<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;upsert=False)<br><br>&nbsp;&nbsp;&nbsp;&nbsp;def&nbsp;update_batch(self,&nbsp;condition,&nbsp;form):<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"""<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;批量更新<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;:param&nbsp;condition:<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;:param&nbsp;form:<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;:return:<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"""<br>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;self.mc.update_many(condition,&nbsp;{"</span></span><span class="MJX_Assistive_MathML" aria-readonly="true"><math xmlns="http://www.w3.org/1998/Math/MathML"><merror><mtext>set":&nbsp;form},&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;upsert=False)&nbsp;&nbsp;&nbsp;&nbsp;def&nbsp;update_batch(self,&nbsp;condition,&nbsp;form):&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"""&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;批量更新&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;:param&nbsp;condition:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;:param&nbsp;form:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;:return:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;"""&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;self.mc.update_many(condition,&nbsp;{"</mtext></merror></math></span></span><script type="math/tex" id="MathJax-Element-1">set": form},
                                                        upsert=False)

    def update_batch(self, condition, form):
        """
        批量更新
        :param condition:
        :param form:
        :return:
        """
        return self.mc.update_many(condition, {"</script>set": form})
        pass

    def distinct(self, field, sql=None):
        return self.query(sql=sql).distinct(field)

    def remove(self, *args, **kwargs):
        _ = kwargs if len(kwargs) else args[0]
        result = self.mc.delete_many(_)

分享好友

分享这个小栈给你的朋友们,一起进步吧。

MongoDB资料专区
创建时间:2020-05-08 13:54:47
MongoDB是一个介于关系数据库和非关系数据库之间的产品。MongoDB是一个基于分布式文件存储 [1] 的数据库。由C++语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 小雨滴
    专家
戳我,来吐槽~