——本篇文章 by 。大咖
MongoDB是由C++语言编写的一个基于分布式文件存储的开源NoSQL数据库系统,它提供了可扩展的高性能数据存储解决方案。
由于MongoDB是一个面向文档存储的数据库,所以操作起来比较简单和容易。MongoDB支持丰富的查询表达式,高性能的插入与查询操作,并在负载调节方面提供了较好的支持。MongoDB的这些特性很好地适应了量化策略开发项目对证券数据存储的需求。本文利用聚宽平台的本地化API获取证券数据,并以K线数据为例,演示了如何将获取到的K线数据存储在本地。
我们首先建立一个访问数据库的配置文件(这是一个json文件),这是为了方便在项目中同时访问多个数据源。
{
"default": {
// 这是一个常规的配置方案
// 把这个配置命名为"default"是有专门考量的,后文会提到这个问题
"host": "127.0.0.1:57017",
"dbname": "jq",
"username": "user1",
"password": "12345",
"dbauth": "admin"
},
"server1": {
// 当你的数据库没有配置用户角色的时候
// 你只需要填写下面这两个参数即可
"host": "127.0.0.1:57016",
"dbname": "ttt"
},
"server2": {
// 这实际上是配置了一个复制集
// 关于复制集的概念,就不展开了
"host": "127.0.0.1",
"replicaset": "test_rs"
},
"shard_a": {
// 这是配置了一个分片服务器
// 看起来和配置一个常规的数据库没什么区别
"host": "127.0.0.1:40000",
"dbname": "ttt"
}
}
创建好数据库的访问配置文件后,紧接着就是添加一个连接数据库的db.py文件了:
# -*- coding: utf-8 -*-
from pymongo import * # MongoDB的连接库
import json
# 构建一个默认的MongoDB对象置于项目运行的内存当中,避免频繁
# 的创建MongoDB对象
config_file = 'E:/db_config.json' # 上面创建的那个json文件的路径
try:
with open(config_file) as f:
l = f.read()
a = json.loads(l)
db = a['default'] # 默认访问的数据库,上文说过
uri += '/?connectTimeoutMS=2000'
if 'replicaset' in fields:
uri += ';replicaSet=%s' % db['replicaset']
if 'dbauth' in fields:
uri += ';authSource=%s' % db['dbauth']
print('db-uri:', uri)
connection = MongoClient(uri)
if 'dbname' in fields:
mongodb = connection[db['dbname']]
else:
mongodb = None
except Exception:
raise Exception('connection MongoDB raise a error')
class MongoDB:
"""数据库对象"""
connection_count = 0
# 当你在程序中,需要访问另一个数据库,那你就需要再临时创建一个连接对象了
# 为了避免连续访问相同的数据库对象,设置location、connection
# 两个类属性,保留近一次访问的对象
location = None
connection = None
db_name = None
@classmethod
def db_connection(cls, location, db_name=None):
"""
连接到数据库
:param db_name:
:param location:
:return:
"""
try:
if location == cls.location and cls.connection is not None:
# 这个连接对象已经存在
if db_name is not None:
return cls.connection[db_name]
else:
# 正常情况下,cls.connection不为空的时候,cls.db_name
# 也不为空
return cls.connection[cls.db_name]
else:
with open(config_file) as cf:
buffer = cf.read()
jn = json.loads(buffer)
db_ = jn[location]
fields = db_.keys()
uri = 'mongodb://'
if 'username' in fields and 'password' in fields:
uri += '{username}:{password}@'.format(username=db_['username'],
password=db_['password'])
# ip是必须的
uri += db_['host']
# if 'port' in fields:
# uri += ':%s' % db_['port']
uri += '/?connectTimeoutMS=2000'
if 'replicaset' in fields:
uri += ';replicaSet=%s' % db_['replicaset']
if 'dbauth' in fields:
uri += ';authSource=%s' % db_['dbauth']
print('new db-uri:', uri)
connection = MongoClient(uri)
tn = db_name if db_name is not None else db_['dbname']
cls.location = location
cls.connection = connection
cls.db_name = tn
return connection[tn]
except Exception:
raise Exception('connection MongoDB raise a error')
有了上面的步骤,我们就获得了一个即能访问单点服务器,有能访问复制集、分片服务器的连接对象了。下面就是访问数据库表的数据了,在这里我们创建一个base_model.py文件,它扮演的角色类似于controller,可以完成对数据库中所有表的映射,这其实更多的得益于NoSQL的特性。
# -*- coding: utf-8 -*-
import pymongo
from bson import ObjectId
from .db import MongoDB, ASCENDING, DESCENDING, mongodb, connection
class BaseModel(object):
"""
_id 是 mongo 自带的,必须有这个字段
其余 __fields__ 的固定属性,未来会逐步添加
classtype 是类名的小写
"""
__fields__ = [
'_id',
# (字段名, 类型)
# ('classtype', str),
]
def __init__(self, tn=None, location=None, dbname=None):
name = self.__class__.__name__
self.tablename = tn.strip() if tn is not None and len(tn) else name.lower()
if location is None and dbname is None:
if mongodb is not None:
self.mc = mongodb[self.tablename]
else:
raise Exception('Unable to find available dbname')
elif location is None and dbname is not None:
self.mc = connection[dbname][self.tablename]
else:
self.location = location
self.dbname = dbname
self.mc = MongoDB.db_connection(self.location, self.dbname)[self.tablename]
def insert(self, *args, **kwargs):
"""
插入一条数据
"""
_ = kwargs if len(kwargs) else args[0]
_['classtype'] = self.tablename
# 去掉 _id 这个特殊的字段
if '_id' in _:
_['_id'] = ObjectId()
m = self.mc.insert_one(_)
return m
def insert_batch(self, *args):
"""
批量插入数据
"""
_ = list()
if len(args) == 1:
_ = args[0]
if isinstance(_, list):
pass
else:
_ = [_]
elif len(args) > 1:
_ = args
result = []
for i in _:
if '_id' in i:
i['_id'] = ObjectId()
# del i['_id']
i['classtype'] = self.tablename.lower()
try:
if len(_):
result = self.mc.insert_many(_)
except pymongo.errors.BulkWriteError as e:
if isinstance(_, list):
r = _[0]
else:
r = _
#log('insert_batch', self.tablename, r, msg=e.details['writeErrors'])
def query(self, sql=None, field=None):
"""
数据查询
返回 list
找不到则返回 []
"""
# _ = kwargs if len(kwargs) else args[0] if len(args) else None
ds = self.mc.find(sql, projection=field)
return ds
def aggregate(self, pipeline, allowDiskUse=True):
"""
聚合函数
:param pipeline: list 聚合表达式
:param allowDiskUse: 运行使用磁盘来处理超过100M的数据
:return:
"""
return self.mc.aggregate(pipeline, allowDiskUse=allowDiskUse)
def query_one(self, sql=None, field=None):
"""
查找并返回个元素
找不到就返回 None
"""
# _ = kwargs if len(kwargs) else args[0]
l = self.mc.find_one(sql, projection=field)
return l
def update(self, cond, form):
"""
"""
self.mc.find_one_and_update(cond, {"<span class="MathJax_Preview" style="color: inherit;"></span><span class="MathJax" id="MathJax-Element-1-Frame" tabindex="0" data-mathml="<math xmlns="http://www.w3.org/1998/Math/MathML"><merror><mtext>set&quot;:&#xA0;form},&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;upsert=False)&#xA0;&#xA0;&#xA0;&#xA0;def&#xA0;update_batch(self,&#xA0;condition,&#xA0;form):&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&quot;&quot;&quot;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#x6279;&#x91CF;&#x66F4;&#x65B0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;:param&#xA0;condition:&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;:param&#xA0;form:&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;:return:&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&quot;&quot;&quot;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;return&#xA0;self.mc.update_many(condition,&#xA0;{&quot;</mtext></merror></math>" style=""><span class="math" id="MathJax-Span-1" aria-hidden="true" style="vertical-align: 0.231em;"><span class="noError" id="MathJax-Span-2" style="display: inline-block;">set": form},<br> upsert=False)<br><br> def update_batch(self, condition, form):<br> """<br> 批量更新<br> :param condition:<br> :param form:<br> :return:<br> """<br> return self.mc.update_many(condition, {"</span></span><span class="MJX_Assistive_MathML" aria-readonly="true"><math xmlns="http://www.w3.org/1998/Math/MathML"><merror><mtext>set": form}, upsert=False) def update_batch(self, condition, form): """ 批量更新 :param condition: :param form: :return: """ return self.mc.update_many(condition, {"</mtext></merror></math></span></span><script type="math/tex" id="MathJax-Element-1">set": form},
upsert=False)
def update_batch(self, condition, form):
"""
批量更新
:param condition:
:param form:
:return:
"""
return self.mc.update_many(condition, {"</script>set": form})
pass
def distinct(self, field, sql=None):
return self.query(sql=sql).distinct(field)
def remove(self, *args, **kwargs):
_ = kwargs if len(kwargs) else args[0]
result = self.mc.delete_many(_)