- 浏览: 983572 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (832)
- 系统集成、架构 (116)
- 前端负载、开发、db (139)
- 监控与安全、性能 (70)
- Trouble Shooting (45)
- 管理哲学、业余提升 (9)
- 经典转载 (8)
- 主导过项目演示 (37)
- docker集群 (9)
- jvm 性能参数 (7)
- 监控平台 (28)
- 发布管理平台 (6)
- 日志平台 (53)
- Tools (36)
- shell & pytho &ansible自动化运维平台 (20)
- mongodb&nosql&db (32)
- system (12)
- kafa zk (11)
- python&nodejs (311)
- 数据平台 (5)
- PAAS (0)
- IAAS (0)
- SAAS (0)
- Go (2)
- kotlin (2)
- Redis学习笔记4-脚本、持久化和集群 (2)
最新评论
-
hsluoyz:
PyCasbin 是一个用 Python 语言打造的轻量级开源 ...
django guardian 对象级别权限设计 -
phncz310:
厉害了,我的哥
python黑魔法异常重试的次数,间隔的装饰器涵数 -
adamoooo:
Zabbix二次开发,可以试试:乐维监控www.91lewei ...
zabbix二次开发及app -
shi3689476:
你好,模块下载地址还能提供一下吗?
NGINX开发杀手锏-线程池 -
tobato:
Elasticsearch 和 Influxdb 为何选了El ...
elastic作数据源,对比kibana与grafana
简单说明:
此模块儿常用来把关系数据库的表结构映射到对象上,允许开发人员首先设计数据模型,并能决定稍候可视化数据的方式(CLI/WEB/GUI),和以往的先绝对如何在框架允许的范围内使用数据模型的开发方法完全相反,它兼容众多数据库(SQLite/MySQL/Postgres/Oracle/MS-SQL/SQLServer/Firebird)等(http://www.sqlalchemy.org/organizations.html)
会话实例:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建引擎
from sqlalchemy import create_engine
# 创建会话类
from sqlalchemy.orm import sessionmaker
def main():
"""Main function.
"""
# mysqldb是数据库引擎,root:root@127.0.0.1分别为帐号:密码@主机,xmdevops为数据库,charset为字符集
sql_connect_str = ('mysql+mysqldb://root:root@127.0.0.1/'
'xmdevops?charset=utf8')
# 通过连接字符串创建一个引擎,echo=True会通过logging模块儿来输出日志
sql_engine = create_engine(sql_connect_str, echo=True)
# 基于数据库引擎创建会话类
SessionCls = sessionmaker(bind=sql_engine)
# 实例化一个会话,其实创建了默认为5个连接的连接池
sql_session = SessionCls()
s.execute(clause, params=None, mapper=None, bind=None, **kw) -> ResultProxy
说明: 执行SQL语句
s.commit() -> None
说明:提交
s.rollback()
说明:回滚
s.close() -> None
说明:关闭内存中的当前会话
s.close_all() -> None
说明:关闭内存中的所有会话
游标实例:
r.keys() -> list
说明:列表形式返回当前解决集的所有列字段
r.closed -> bool
说明:判断游标是否关闭
r.close() -> None
说明:关闭游标
r.fetchall() -> list
说明:游标获取所有结果集
r.fetchone() -> list
说明:游标获取一行结果集
r.fetchmany(size=None) -> list
说明:游标获取多行结果集
r.first() -> list
说明:游标获取结果集第一行
r.rowcount -> int
说明:返回游标结果集行数
r.__dict__
说明:返回游标记过集的字典形式
定义映射:
1.经典映射模式,需要先描述这个表,例如要描述如下表结构
CREATE TABLE [users] ([id] INTEGER PRIMARY KEY,
[name] TEXT NOT NULL,
[fullname] TEXT NOT NULL,
[password] TEXT NOT NULL);
第一步骤:描述表结构
from sqlalchemy.orm import mapper
from sqlalchemy import (Table, MetaData, Column,
CHAR, Text,
VARCHAR, Integer, String)
metadata = MetaData()
user = Table('users',
metadata,
Column('u_id', Integer, primary_key=True),
Column('u_name', Text),
Column('u_fullname', Text),
Column('u_password', Text))
第二步骤:定义映射类
class User(object):
def __init__(self, u_name, u_fullname, u_password):
self.u_name = u_name
self.u_fullname = u_fullname
self.u_password = u_password
第三步骤:绑映射描述
mapper(User, user)
说明:上面的方式是SB的原始方式,当大家都在乐此不疲的定义描述表,定义类再实现ORM的时候,SQLAlchemy团队搞出更加简单的映射方法,就是现代模式,只需要定义映射类就可以一次性完成所有的任务
2.现代映射模式
1.为了定义的类能够被SQLAlchemy管理,所以引入第三方类Declarative,我们所有定义的映射类都必须是它的子类
第一步骤:生成公共基类
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
说明:一个程序内,基类最好的唯一的,建议存储在全局变量如上Base中供所有映射类使用,通过
基类可以定义N多的映射子类,这些子类都能被SQLAlchemy Declarative管理
第二步骤:定义映射类
from sqlalchemy import (Column, CHAR, VARCHAR,
Integer, String, Text)
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
def __init__(self, u_name, u_fullname, u_password):
self.u_name = u_name
self.u_fullname = u_fullname
self.u_password = u_password
def __repr__(self):
return '<User(%s, %s, %s)>' % (self.u_name,
self.u_fullname,
self.u_password)
说明:如上代码就完成了先前经典模式所需三步,代码比原先更简洁和容易管理,同刚才经典模式的Table定义的Column,这个代表数据库表中的列,Text/Integer代表数据库表的字段类型,这样User类就建立起与数据库表的映射,真实的名字可以使用__tablename__指明,然后是表列的集合,包括id,name,fullname,password,通过primary_key=True已经指明id为主键,ORM为了能够实际映射表需要至少一个列被定义为主键列,多列当然也支持,其实上面的魔术方法__init__()/__repr__()都是可选的,可以手动加入任意方法或属性,唯一需要注意的是必须继承Base基类,保证SQLAlchary Declarative可管理这些映射类和数据库表.随着User映射类通过Declaractive_base系统构造成功,我们就拥有了相关的定义信息,例如经典模式中的Table()描述,也包含映射到表中的类,User自身,可以通过User.__table__来获取表描述,User.__mapper__来获取映射对象,Base.metedata来获取metedata对象,如果想将定义好数据结构实现为实体数据库,只需要Base.metedata.create_all(engine)即可
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, Text, create_engine
def main():
"""Main function.
"""
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
# 获取经典模式下的Table()表描述信息
pprint.pprint(repr(User.__table__))
# 获取经典模式下mapper()表和类映射关系
pprint.pprint(repr(User.__mapper__))
# 将表结构写入数据库
sql_conn_str = 'mysql+mysqldb://root:root@127.0.0.1/xmdevops?charset=utf8'
sql_engine = create_engine(sql_conn_str, echo=True)
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
main()
会话创建:
1.创建类的实例,declearative会自动检测子类构造方法,如果没有定义会自动定义,主键默认为None,因为数据对象未持久化存储
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, Text, create_engine
def main():
"""Main function.
"""
# 创建基类
Base = declarative_base()
# 定义表结构
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
""" 可以省略__init__()构造方法,因为Base基类会检测,缺少构造方法会自动补上,
但其提供的构造方法建议使用键值对的参数访问方式,包含我们用Column定义映射的列
"""
def __init__(self, u_name, u_fullname, u_password):
self.u_name = u_name
self.u_fullname = u_fullname
self.u_password = u_password
# 实例映射类(id也可以传入,通常意义上这类主键由系统自动维护,无需为其赋值)
user_manman = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lz')
# id为None是因为没有对对象持久化存储
print user_manman.u_id
print user_manman.u_name
print user_manman.u_fullname
print user_manman.u_password
if __name__ == '__main__':
main()
2.ORM的操作句柄被称为会话SESSION,为了使用会话需要首先创建会话实例(先建立引擎然基于引擎创建会话,其实先建立会话再绑定引擎也是可以滴)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
def main():
"""Main function.
"""
sql_conn_str = 'mysql+mysqldb://root:root@127.0.0.1/xmdevops?charset=utf8'
# 方法一: 先创建引擎再基于引擎创建会话
sql_engine = create_engine(sql_conn_str)
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
# 方法二: 先创建会话再绑定引擎
SQLSession = sessionmaker()
SQLSession.configure(bind=sql_engine)
sql_session = SQLSession()
if __name__ == '__main__':
main()
说明:到此就获取了由engine维护的数据库连接池,并且会维持内存中的映射数据直到commit提交或是更改或是关闭会话对象
添加对象:
1.要想将映射到实体表中的映射类如User持久化,需要将这个User类建立的对象实例添加到上面创建好的会话实例中
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建基类
from sqlalchemy.ext.declarative import declarative_base
# 创建会话类
from sqlalchemy.orm import sessionmaker
# 创建表结构
from sqlalchemy import Text, Integer, Column, create_engine
Base = declarative_base()
# 定义uses表结构
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
# 省略__init__()构造,Base自动构造
# 获取表结构(比默认打印更直观)
def __repr__(self):
return '<User(%s, %s, %s)>' % (self.u_name,
self.u_fullname,
self.u_password)
# 创建数据引擎
def create_sql_engine(uname, upass, rhost, rdatabase,
rport=3306, rcharset='utf8'):
"""Create database engine from common use.
Args:
uname : sql user name
upass : sql user pass
rhost : remote sql host
rport : remote sql port
rdatabase: remote sql database
rcharset : remote sql charset, default utf8
Returns:
engine
"""
sql_conn_str = 'mysql+mysqldb://%s:%s@%s:%s/%s?charset=%s' % (uname,
upass,
rhost,
rport,
rdatabase,
rcharset)
sql_engine = create_engine(sql_conn_str, echo=True)
return sql_engine
# 创建会话实例
def create_orm_session(sql_engine):
"""Create database session base sql engine.
Returns:
session
"""
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
return sql_session
# 创建表结构
def create_table_struct(sql_engine):
"""Create table struct above.
Args:
sql_engine: sql engine api
Returns:
None
"""
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 一次添加单个对象
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lz')
orm_session.add(usr_lmm)
# 一次添加多个对象
orm_session.add_all([
User(u_name='lmm1', u_fullname='limanman1', u_password='lm_521314_lz1'),
User(u_name='lmm2', u_fullname='limanman2', u_password='lm_521314_lz2'),
User(u_name='lmm3', u_fullname='limanman3', u_password='lm_521314_lz3')
])
# 查询测试LazyLoad策略
orm_session.query(User).filter_by(u_name='lmm').first()
说明:SQLAlchemy采用Lazyload策略,add到orm_session此时usr_lmm只是被标记为Pending准备状态,并没有执行任何可能导致数据库变化的SQL语句,只有查询对象,对象的一个属性或显示调用flush时才会将Pending状态的数据写入数据表,如下我们用Query对象结合日志来说明,Query对象返回的是一个User实例,就是我们之前持久化的.由于我们指定的echo=True,如下的调试信息中可以看到其实先Insert然后再Select,也就是说你在实际操作持久化数据时才会由延迟加载真正触发数据库操作
2016-04-22 19:35:58,393 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,393 INFO sqlalchemy.engine.base.Engine ('lmm', 'limanman', 'lm_521314_lz')
2016-04-22 19:35:58,396 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,397 INFO sqlalchemy.engine.base.Engine ('lmm1', 'limanman1', 'lm_521314_lz1')
2016-04-22 19:35:58,400 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,401 INFO sqlalchemy.engine.base.Engine ('lmm2', 'limanman2', 'lm_521314_lz2')
2016-04-22 19:35:58,403 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,403 INFO sqlalchemy.engine.base.Engine ('lmm3', 'limanman3', 'lm_521314_lz3')
2016-04-22 19:35:58,410 INFO sqlalchemy.engine.base.Engine SELECT users.u_id AS users_u_id, users.u_name AS users_u_name, users.u_fullname AS users_u_fullname, users.u_password AS users_u_password
FROM users
WHERE users.u_name = %s
LIMIT %s
2016-04-22 19:35:58,410 INFO sqlalchemy.engine.base.Engine ('lmm', 1)
<User(lmm, limanman, lm_521314_lz)>
2.如果要将orm缓存数据显式的提交反馈到数据库里,需要调用orm_session.commit()告诉session目前的添加和改动,操作完成后session引用的数据库连接资源被回收到连接池,接下来对于这个session的操作会触发一个新的事物申请新的连接池连接资源
3.session是作为事务来工作,所以我们可以回滚先前所做的更改,查询时首先会在标识映射表中查询,查询不到才会去实体数据库中查找
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 一次添加单个对象
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lz')
# 将usr_lmm在标识映射表中标识为Pending状态
orm_session.add(usr_lmm)
# 更改实例属性
usr_lmm.u_name = 'lzz'
# 依据名称查询,触发持久化操作,将Pending状态的对象写入数据库,但此时并没commit,所以实体数据库中必然没有记录
print '------------------------------------------------------'
print orm_session.query(User).filter_by(u_name='lzz').first()
print '------------------------------------------------------'
# 回滚之前的更改
orm_session.rollback()
# 回滚后数据未持久化,此时查询的数据来自于实体数据库
print orm_session.query(User).filter_by(u_name='lmm').first()
多表关联:
1.实际生产中数据存储多表联查,常用的1-N(一对多)关系(再创建一个邮件表,每个用户可以关联一个或是多个邮件地址)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建基类
from sqlalchemy.ext.declarative import declarative_base
# 创建会话类
from sqlalchemy.orm import sessionmaker, aliased, relationship, backref
# 创建表结构
from sqlalchemy import Text, Integer, Column, create_engine, func, ForeignKey
Base = declarative_base()
# 定义uses表结构
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
# 省略__init__()构造,Base自动构造
# 获取表结构(比默认打印更直观)
def __repr__(self):
return '<User(%s, %s, %s)>' % (self.u_name,
self.u_fullname,
self.u_password)
# 定义Address表结构
class Address(Base):
__tablename__ = 'addresses'
a_id = Column(Integer, primary_key=True)
a_email = Column(Text, nullable=False)
u_id = Column(Integer, ForeignKey('users.u_id'))
"""
通过relationship实现Address.users获取用户实例集,back_populates='addresses'
必须双向指定,backref='addresses'作用相同但是可以单向指定,双向引用,addresses任意指定
主要是通过User.addresses获取邮件实例集
"""
users = relationship('User', backref='addresses')
def __repr__(self):
return '<Address(%s)>' % (self.a_email)
# 创建数据引擎
def create_sql_engine(uname, upass, rhost, rdatabase,
rport=3306, rcharset='utf8'):
"""Create database engine from common use.
Args:
uname : sql user name
upass : sql user pass
rhost : remote sql host
rport : remote sql port
rdatabase: remote sql database
rcharset : remote sql charset, default utf8
Returns:
engine
"""
sql_conn_str = 'mysql+mysqldb://%s:%s@%s:%s/%s?charset=%s' % (uname,
upass,
rhost,
rport,
rdatabase,
rcharset)
sql_engine = create_engine(sql_conn_str, echo=True)
return sql_engine
# 创建会话实例
def create_orm_session(sql_engine):
"""Create database session base sql engine.
Returns:
session
"""
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
return sql_session
# 创建表结构
def create_table_struct(sql_engine):
"""Create table struct above.
Args:
sql_engine: sql engine api
Returns:
None
"""
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加用户记录
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lm')
# 添加地址对象
usr_lmm.addresses = [
Address(a_email='1784763743@qq.com'),
Address(a_email='xmdevops@vip.qq.com'),
]
# 添加到会话,提交
orm_session.add(usr_lmm)
orm_session.commit()
# 通过正反向关联获取用户信息
last_user = orm_session.query(User).filter(User.u_name=='lmm').all()[-1]
print 'user: %s' % (last_user.u_name)
for cur_a_email in last_user.addresses:
print cur_a_email.a_email
2.实际生产中数据存储多表联查,常用的N-N(多对多)关系(创建一个主机表和主机组表,一个主机可以属于多个表,一个主机组可以包含多个主机)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建基类
from sqlalchemy.ext.declarative import declarative_base
# 创建会话类
from sqlalchemy.orm import sessionmaker, aliased, relationship, backref
# 创建表结构
from sqlalchemy import (Text, Integer, String, Table,
Column, create_engine, func, ForeignKey)
Base = declarative_base()
# 主机-2-主机组类
# host_id + group_id 唯一不重复(一个主机可以属于多个主机组),关联另外两个表
host_2_group = Table('host_2_group', Base.metadata,
Column('host_id', Integer, ForeignKey('host.id'),
autoincrement=False, primary_key=True),
Column('group_id', Integer, ForeignKey('group.id'),
autoincrement=False, primary_key=True))
# 主机类
class Host(Base):
__tablename__ = 'host'
id = Column(Integer,
unique=True,
nullable=False,
autoincrement=True,
primary_key=True)
hostname = Column(String(32),
nullable=False)
haddress = Column(String(128),
nullable=False)
hostport = Column(Integer,
nullable=False)
password = Column(String(64),
nullable=True)
auth_key = Column(String(256),
nullable=True)
# User基于host_2_group来关联group表,backref反向引用,则Group基于host_2_group来关联host表
groups = relationship('Group', backref='hosts', secondary=host_2_group)
def __repr__(self):
return ('<Host(id: %s hostname: %s haddress: %s password: %s'
'hostport: %s auth_key: %s)>') % ( self.id, self.hostname,
self.haddress, self.hostport,
self.password, self.auth_key)
# 主机组类
class Group(Base):
__tablename__ = 'group'
id = Column(Integer,
unique=True,
nullable=False,
autoincrement=True,
primary_key=True)
groupname = Column(String(32),
nullable=False)
def __repr__(self):
return '<Group(id: %s groupname: %s)>' % (self.id, self.groupname)
# 创建数据引擎
def create_sql_engine(uname, upass, rhost, rdatabase,
rport=3306, rcharset='utf8'):
"""Create database engine from common use.
Args:
uname : sql user name
upass : sql user pass
rhost : remote sql host
rport : remote sql port
rdatabase: remote sql database
rcharset : remote sql charset, default utf8
Returns:
engine
"""
sql_conn_str = 'mysql+mysqldb://%s:%s@%s:%s/%s?charset=%s' % (uname,
upass,
rhost,
rport,
rdatabase,
rcharset)
sql_engine = create_engine(sql_conn_str, echo=True)
return sql_engine
# 创建会话实例
def create_orm_session(sql_engine):
"""Create database session base sql engine.
Returns:
session
"""
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
return sql_session
# 创建表结构
def create_table_struct(sql_engine):
"""Create table struct above.
Args:
sql_engine: sql engine api
Returns:
None
"""
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加主机组
#xm_newnatserver = Group(groupname='newnatserver')
#xm_statushelper = Group(groupname='statushelper')
#orm_session.add_all([
#xm_newnatserver,
#xm_statushelper
#])
# 添加主机
#group_instances = orm_session.query(Group).all()
#for grp_instance in group_instances:
#for i in xrange(1, 3):
#h_hostname = 'xm-server-%s' % (i)
#cur_host = Host(hostname=h_hostname,
#haddress='127.0.0.1',
#hostport=22,
#password='lm_521314_lz',
#auth_key=None)
## 关联主机和主机组
#cur_host.groups = [grp_instance,]
#orm_session.add(cur_host)
# 查询每个主机组中的主机
for cur_grp in orm_session.query(Group).all():
grp_hosts = cur_grp.hosts
print '%s' % (cur_grp),
for cur_host in grp_hosts:
print '''
%s''' % (cur_host),
# 查询每个主机所属主机组
for cur_host in orm_session.query(Host).all():
usr_group = cur_host.groups
print '%s -> %s' % (cur_host, usr_group)
orm_session.commit()
增删查改:
1.Query对象其实返回的是上面添加到session里面的可迭代的实例对象列表,Query对象的每一步操作依然返回Query对象,体现了它串联强大特性
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
for cur_instance in orm_session.query(User).order_by(User.u_id):
print '''%s
u_id : %s
u_name : %s
u_fullname: %s
u_password: %s''' % (cur_instance, cur_instance.u_id,
cur_instance.u_name,
cur_instance.u_fullname, cur_instance.u_password)
# 增加记录项 - add(instance)/add_all([instance1,...,instancen])
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加多项记录
orm_session.add_all([
User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lm'),
User(u_name='lzz', u_fullname='liuzhen', u_password='lz_521314_lz'),
])
orm_session.commit()
# 删除记录项 - filter+delete删除过滤项
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 删除符合条件的记录
from sqlalchemy import or_
orm_session.query(User).filter(or_(User.u_name == 'lmm',
User.u_name == 'lzz',)).delete()
orm_session.commit()
# 查询指定列 - 返回元组类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 查询指定的列时返回的是元组
for (cur_inst,
u_name,
u_fullname,
u_password) in orm_session.query(User, User.u_name,
User.u_fullname, User.u_password):
print '''%s
u_name : %s
u_fullname: %s
u_password: %s''' % (cur_inst, u_name, u_fullname, u_password)
# 设置列别名 - 返回元组类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 查询指定的列时返回的是元组
for u_name in orm_session.query(User.u_name.label('name')).all():
print u_name
# 设置类别名 - 多次引用起别名
from sqlalchemy.orm import aliased
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 起别名
u = aliased(User, name='u')
# 查询指定的列时返回的是元组
for (u_inst,
u_name,
u_fullname,
u_password) in orm_session.query(u, u.u_name,
u.u_fullname,
u.u_password):
print '''%s
u_name : %s
u_fullname: %s
u_password: %s''' % (u, u_name, u_fullname, u_password)
# 排序限制 - 返回列表类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 结合order_by一起使用,数组分片实现
for cur_instance in orm_session.query(User).order_by(User.u_id)[1:]:
print '''%s
u_name : %s
u_fullname: %s
u_password: %s''' % (cur_instance,
cur_instance.u_name,
cur_instance.u_fullname,
cur_instance.u_password)
# 返回所有行
pprint.pprint(orm_session.query(User).filter(User.u_name.like('lmm%')).all())
# 返回首行
pprint.pprint(orm_session.query(User).filter(User.u_name.like('lmm%')).first())
# 筛选过滤 - filter_by关键词参数过滤/filter支持PY自身操作符,更强大
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 相等测试
pprint.pprint(orm_session.query(User).filter(User.u_name=='lmm').all())
# 不等测试
pprint.pprint(orm_session.query(User).filter(User.u_name!='lmm').all())
# like测试
pprint.pprint(orm_session.query(User).filter(User.u_name.like('lmm%')).all())
# in 测试
pprint.pprint(orm_session.query(User).filter(User.u_name.in_(['lmm1', 'lmm2', 'lmm3'])).all())
# not in 测试
pprint.pprint(orm_session.query(User).filter(~User.u_name.in_(['lmm1', 'lmm2', 'lmm3'])).all())
# is null测试
pprint.pprint(orm_session.query(User).filter(User.u_name == None).all())
# is not null测试
pprint.pprint(orm_session.query(User).filter(User.u_name != None).all())
# and 测试
# 方法一: and_
from sqlalchemy import and_
pprint.pprint(orm_session.query(User).filter(and_(User.u_name=='lmm', User.u_fullname=='limanman')).all())
# 方法二: 再次filter Query对象
pprint.pprint(orm_session.query(User).filter(User.u_name=='lmm').filter(User.u_fullname=='limanman').all())
# or 测试
from sqlalchemy import or_
pprint.pprint(orm_session.query(User).filter(or_(User.u_name=='lmm', User.u_name.in_(['lmm1', 'lmm2', 'lmm3']))).all())
# 原义SQL - SQL查询字符串作为查询参数,还支持绑定参数
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 绑定参数基于字符串的SQL指派,使用冒号来标记代替参数,然后用params方法指定相应的值
pprint.pprint(orm_session.query(User).filter('u_name=:name and u_fullname=:fullname').params(name='lmm', fullname='limanman').all())
# 极端SQL - 直接使用SQL语句脱离orm框架,不建议使用
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 绑定参数基于SQL语句,使用冒号来标记代替参数,然后用params方法指定相应的值
pprint.pprint(orm_session.query(User).from_statement('select * from users where u_name=:name and u_fullname=:fullname').params(name='lmm', fullname='limanman').all())
# SQL计数 - 返回整型类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 返回结果集行数
pprint.pprint(orm_session.query(User).filter(User.u_name.in_(['lmm1', 'lmm2', 'lmm3'])).count())
或
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 返回结果集行数
pprint.pprint(orm_session.query(func.count(User.u_id)).scalar())
# 分组统计 - 返回列表类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 按照每个用户分组,统计用户数
pprint.pprint(orm_session.query(User.u_name, func.count(User.u_name).label('u_count')).group_by(User.u_name).all())
# 左右联查 - inner join + group_by
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加用户记录
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lm')
# 添加地址对象
usr_lmm.addresses = [
Address(a_email='1784763743@qq.com'),
Address(a_email='xmdevops@vip.qq.com'),
]
# 添加到会话,提交
orm_session.add(usr_lmm)
orm_session.commit()
# inner join - 引用外键表,自动找外键
pprint.pprint(orm_session.query(User).join(User.addresses).all())
# inner join - 手动指定表,自动找外键
pprint.pprint(orm_session.query(User).join(Address).all())
# 连接 - 排序 - 分组
pprint.pprint(orm_session.query(User.u_name, func.count(Address.a_email)).join(Address).filter(User.u_id==Address.u_id).group_by(User.u_name).all())
# 修改记录项 - update更新过滤项
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 修改记录项
orm_session.query(User).filter(User.u_name=='lmm').update({
'u_password': 'lz_521314_lz',
})
orm_session.commit()
reference:
https://my.oschina.net/pydevops/blog/664872
此模块儿常用来把关系数据库的表结构映射到对象上,允许开发人员首先设计数据模型,并能决定稍候可视化数据的方式(CLI/WEB/GUI),和以往的先绝对如何在框架允许的范围内使用数据模型的开发方法完全相反,它兼容众多数据库(SQLite/MySQL/Postgres/Oracle/MS-SQL/SQLServer/Firebird)等(http://www.sqlalchemy.org/organizations.html)
会话实例:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建引擎
from sqlalchemy import create_engine
# 创建会话类
from sqlalchemy.orm import sessionmaker
def main():
"""Main function.
"""
# mysqldb是数据库引擎,root:root@127.0.0.1分别为帐号:密码@主机,xmdevops为数据库,charset为字符集
sql_connect_str = ('mysql+mysqldb://root:root@127.0.0.1/'
'xmdevops?charset=utf8')
# 通过连接字符串创建一个引擎,echo=True会通过logging模块儿来输出日志
sql_engine = create_engine(sql_connect_str, echo=True)
# 基于数据库引擎创建会话类
SessionCls = sessionmaker(bind=sql_engine)
# 实例化一个会话,其实创建了默认为5个连接的连接池
sql_session = SessionCls()
s.execute(clause, params=None, mapper=None, bind=None, **kw) -> ResultProxy
说明: 执行SQL语句
s.commit() -> None
说明:提交
s.rollback()
说明:回滚
s.close() -> None
说明:关闭内存中的当前会话
s.close_all() -> None
说明:关闭内存中的所有会话
游标实例:
r.keys() -> list
说明:列表形式返回当前解决集的所有列字段
r.closed -> bool
说明:判断游标是否关闭
r.close() -> None
说明:关闭游标
r.fetchall() -> list
说明:游标获取所有结果集
r.fetchone() -> list
说明:游标获取一行结果集
r.fetchmany(size=None) -> list
说明:游标获取多行结果集
r.first() -> list
说明:游标获取结果集第一行
r.rowcount -> int
说明:返回游标结果集行数
r.__dict__
说明:返回游标记过集的字典形式
定义映射:
1.经典映射模式,需要先描述这个表,例如要描述如下表结构
CREATE TABLE [users] ([id] INTEGER PRIMARY KEY,
[name] TEXT NOT NULL,
[fullname] TEXT NOT NULL,
[password] TEXT NOT NULL);
第一步骤:描述表结构
from sqlalchemy.orm import mapper
from sqlalchemy import (Table, MetaData, Column,
CHAR, Text,
VARCHAR, Integer, String)
metadata = MetaData()
user = Table('users',
metadata,
Column('u_id', Integer, primary_key=True),
Column('u_name', Text),
Column('u_fullname', Text),
Column('u_password', Text))
第二步骤:定义映射类
class User(object):
def __init__(self, u_name, u_fullname, u_password):
self.u_name = u_name
self.u_fullname = u_fullname
self.u_password = u_password
第三步骤:绑映射描述
mapper(User, user)
说明:上面的方式是SB的原始方式,当大家都在乐此不疲的定义描述表,定义类再实现ORM的时候,SQLAlchemy团队搞出更加简单的映射方法,就是现代模式,只需要定义映射类就可以一次性完成所有的任务
2.现代映射模式
1.为了定义的类能够被SQLAlchemy管理,所以引入第三方类Declarative,我们所有定义的映射类都必须是它的子类
第一步骤:生成公共基类
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
说明:一个程序内,基类最好的唯一的,建议存储在全局变量如上Base中供所有映射类使用,通过
基类可以定义N多的映射子类,这些子类都能被SQLAlchemy Declarative管理
第二步骤:定义映射类
from sqlalchemy import (Column, CHAR, VARCHAR,
Integer, String, Text)
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
def __init__(self, u_name, u_fullname, u_password):
self.u_name = u_name
self.u_fullname = u_fullname
self.u_password = u_password
def __repr__(self):
return '<User(%s, %s, %s)>' % (self.u_name,
self.u_fullname,
self.u_password)
说明:如上代码就完成了先前经典模式所需三步,代码比原先更简洁和容易管理,同刚才经典模式的Table定义的Column,这个代表数据库表中的列,Text/Integer代表数据库表的字段类型,这样User类就建立起与数据库表的映射,真实的名字可以使用__tablename__指明,然后是表列的集合,包括id,name,fullname,password,通过primary_key=True已经指明id为主键,ORM为了能够实际映射表需要至少一个列被定义为主键列,多列当然也支持,其实上面的魔术方法__init__()/__repr__()都是可选的,可以手动加入任意方法或属性,唯一需要注意的是必须继承Base基类,保证SQLAlchary Declarative可管理这些映射类和数据库表.随着User映射类通过Declaractive_base系统构造成功,我们就拥有了相关的定义信息,例如经典模式中的Table()描述,也包含映射到表中的类,User自身,可以通过User.__table__来获取表描述,User.__mapper__来获取映射对象,Base.metedata来获取metedata对象,如果想将定义好数据结构实现为实体数据库,只需要Base.metedata.create_all(engine)即可
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, Text, create_engine
def main():
"""Main function.
"""
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
# 获取经典模式下的Table()表描述信息
pprint.pprint(repr(User.__table__))
# 获取经典模式下mapper()表和类映射关系
pprint.pprint(repr(User.__mapper__))
# 将表结构写入数据库
sql_conn_str = 'mysql+mysqldb://root:root@127.0.0.1/xmdevops?charset=utf8'
sql_engine = create_engine(sql_conn_str, echo=True)
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
main()
会话创建:
1.创建类的实例,declearative会自动检测子类构造方法,如果没有定义会自动定义,主键默认为None,因为数据对象未持久化存储
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, Text, create_engine
def main():
"""Main function.
"""
# 创建基类
Base = declarative_base()
# 定义表结构
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
""" 可以省略__init__()构造方法,因为Base基类会检测,缺少构造方法会自动补上,
但其提供的构造方法建议使用键值对的参数访问方式,包含我们用Column定义映射的列
"""
def __init__(self, u_name, u_fullname, u_password):
self.u_name = u_name
self.u_fullname = u_fullname
self.u_password = u_password
# 实例映射类(id也可以传入,通常意义上这类主键由系统自动维护,无需为其赋值)
user_manman = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lz')
# id为None是因为没有对对象持久化存储
print user_manman.u_id
print user_manman.u_name
print user_manman.u_fullname
print user_manman.u_password
if __name__ == '__main__':
main()
2.ORM的操作句柄被称为会话SESSION,为了使用会话需要首先创建会话实例(先建立引擎然基于引擎创建会话,其实先建立会话再绑定引擎也是可以滴)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
def main():
"""Main function.
"""
sql_conn_str = 'mysql+mysqldb://root:root@127.0.0.1/xmdevops?charset=utf8'
# 方法一: 先创建引擎再基于引擎创建会话
sql_engine = create_engine(sql_conn_str)
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
# 方法二: 先创建会话再绑定引擎
SQLSession = sessionmaker()
SQLSession.configure(bind=sql_engine)
sql_session = SQLSession()
if __name__ == '__main__':
main()
说明:到此就获取了由engine维护的数据库连接池,并且会维持内存中的映射数据直到commit提交或是更改或是关闭会话对象
添加对象:
1.要想将映射到实体表中的映射类如User持久化,需要将这个User类建立的对象实例添加到上面创建好的会话实例中
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建基类
from sqlalchemy.ext.declarative import declarative_base
# 创建会话类
from sqlalchemy.orm import sessionmaker
# 创建表结构
from sqlalchemy import Text, Integer, Column, create_engine
Base = declarative_base()
# 定义uses表结构
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
# 省略__init__()构造,Base自动构造
# 获取表结构(比默认打印更直观)
def __repr__(self):
return '<User(%s, %s, %s)>' % (self.u_name,
self.u_fullname,
self.u_password)
# 创建数据引擎
def create_sql_engine(uname, upass, rhost, rdatabase,
rport=3306, rcharset='utf8'):
"""Create database engine from common use.
Args:
uname : sql user name
upass : sql user pass
rhost : remote sql host
rport : remote sql port
rdatabase: remote sql database
rcharset : remote sql charset, default utf8
Returns:
engine
"""
sql_conn_str = 'mysql+mysqldb://%s:%s@%s:%s/%s?charset=%s' % (uname,
upass,
rhost,
rport,
rdatabase,
rcharset)
sql_engine = create_engine(sql_conn_str, echo=True)
return sql_engine
# 创建会话实例
def create_orm_session(sql_engine):
"""Create database session base sql engine.
Returns:
session
"""
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
return sql_session
# 创建表结构
def create_table_struct(sql_engine):
"""Create table struct above.
Args:
sql_engine: sql engine api
Returns:
None
"""
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 一次添加单个对象
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lz')
orm_session.add(usr_lmm)
# 一次添加多个对象
orm_session.add_all([
User(u_name='lmm1', u_fullname='limanman1', u_password='lm_521314_lz1'),
User(u_name='lmm2', u_fullname='limanman2', u_password='lm_521314_lz2'),
User(u_name='lmm3', u_fullname='limanman3', u_password='lm_521314_lz3')
])
# 查询测试LazyLoad策略
orm_session.query(User).filter_by(u_name='lmm').first()
说明:SQLAlchemy采用Lazyload策略,add到orm_session此时usr_lmm只是被标记为Pending准备状态,并没有执行任何可能导致数据库变化的SQL语句,只有查询对象,对象的一个属性或显示调用flush时才会将Pending状态的数据写入数据表,如下我们用Query对象结合日志来说明,Query对象返回的是一个User实例,就是我们之前持久化的.由于我们指定的echo=True,如下的调试信息中可以看到其实先Insert然后再Select,也就是说你在实际操作持久化数据时才会由延迟加载真正触发数据库操作
2016-04-22 19:35:58,393 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,393 INFO sqlalchemy.engine.base.Engine ('lmm', 'limanman', 'lm_521314_lz')
2016-04-22 19:35:58,396 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,397 INFO sqlalchemy.engine.base.Engine ('lmm1', 'limanman1', 'lm_521314_lz1')
2016-04-22 19:35:58,400 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,401 INFO sqlalchemy.engine.base.Engine ('lmm2', 'limanman2', 'lm_521314_lz2')
2016-04-22 19:35:58,403 INFO sqlalchemy.engine.base.Engine INSERT INTO users (u_name, u_fullname, u_password) VALUES (%s, %s, %s)
2016-04-22 19:35:58,403 INFO sqlalchemy.engine.base.Engine ('lmm3', 'limanman3', 'lm_521314_lz3')
2016-04-22 19:35:58,410 INFO sqlalchemy.engine.base.Engine SELECT users.u_id AS users_u_id, users.u_name AS users_u_name, users.u_fullname AS users_u_fullname, users.u_password AS users_u_password
FROM users
WHERE users.u_name = %s
LIMIT %s
2016-04-22 19:35:58,410 INFO sqlalchemy.engine.base.Engine ('lmm', 1)
<User(lmm, limanman, lm_521314_lz)>
2.如果要将orm缓存数据显式的提交反馈到数据库里,需要调用orm_session.commit()告诉session目前的添加和改动,操作完成后session引用的数据库连接资源被回收到连接池,接下来对于这个session的操作会触发一个新的事物申请新的连接池连接资源
3.session是作为事务来工作,所以我们可以回滚先前所做的更改,查询时首先会在标识映射表中查询,查询不到才会去实体数据库中查找
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 一次添加单个对象
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lz')
# 将usr_lmm在标识映射表中标识为Pending状态
orm_session.add(usr_lmm)
# 更改实例属性
usr_lmm.u_name = 'lzz'
# 依据名称查询,触发持久化操作,将Pending状态的对象写入数据库,但此时并没commit,所以实体数据库中必然没有记录
print '------------------------------------------------------'
print orm_session.query(User).filter_by(u_name='lzz').first()
print '------------------------------------------------------'
# 回滚之前的更改
orm_session.rollback()
# 回滚后数据未持久化,此时查询的数据来自于实体数据库
print orm_session.query(User).filter_by(u_name='lmm').first()
多表关联:
1.实际生产中数据存储多表联查,常用的1-N(一对多)关系(再创建一个邮件表,每个用户可以关联一个或是多个邮件地址)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建基类
from sqlalchemy.ext.declarative import declarative_base
# 创建会话类
from sqlalchemy.orm import sessionmaker, aliased, relationship, backref
# 创建表结构
from sqlalchemy import Text, Integer, Column, create_engine, func, ForeignKey
Base = declarative_base()
# 定义uses表结构
class User(Base):
__tablename__ = 'users'
u_id = Column(Integer, primary_key=True)
u_name = Column(Text)
u_fullname = Column(Text)
u_password = Column(Text)
# 省略__init__()构造,Base自动构造
# 获取表结构(比默认打印更直观)
def __repr__(self):
return '<User(%s, %s, %s)>' % (self.u_name,
self.u_fullname,
self.u_password)
# 定义Address表结构
class Address(Base):
__tablename__ = 'addresses'
a_id = Column(Integer, primary_key=True)
a_email = Column(Text, nullable=False)
u_id = Column(Integer, ForeignKey('users.u_id'))
"""
通过relationship实现Address.users获取用户实例集,back_populates='addresses'
必须双向指定,backref='addresses'作用相同但是可以单向指定,双向引用,addresses任意指定
主要是通过User.addresses获取邮件实例集
"""
users = relationship('User', backref='addresses')
def __repr__(self):
return '<Address(%s)>' % (self.a_email)
# 创建数据引擎
def create_sql_engine(uname, upass, rhost, rdatabase,
rport=3306, rcharset='utf8'):
"""Create database engine from common use.
Args:
uname : sql user name
upass : sql user pass
rhost : remote sql host
rport : remote sql port
rdatabase: remote sql database
rcharset : remote sql charset, default utf8
Returns:
engine
"""
sql_conn_str = 'mysql+mysqldb://%s:%s@%s:%s/%s?charset=%s' % (uname,
upass,
rhost,
rport,
rdatabase,
rcharset)
sql_engine = create_engine(sql_conn_str, echo=True)
return sql_engine
# 创建会话实例
def create_orm_session(sql_engine):
"""Create database session base sql engine.
Returns:
session
"""
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
return sql_session
# 创建表结构
def create_table_struct(sql_engine):
"""Create table struct above.
Args:
sql_engine: sql engine api
Returns:
None
"""
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加用户记录
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lm')
# 添加地址对象
usr_lmm.addresses = [
Address(a_email='1784763743@qq.com'),
Address(a_email='xmdevops@vip.qq.com'),
]
# 添加到会话,提交
orm_session.add(usr_lmm)
orm_session.commit()
# 通过正反向关联获取用户信息
last_user = orm_session.query(User).filter(User.u_name=='lmm').all()[-1]
print 'user: %s' % (last_user.u_name)
for cur_a_email in last_user.addresses:
print cur_a_email.a_email
2.实际生产中数据存储多表联查,常用的N-N(多对多)关系(创建一个主机表和主机组表,一个主机可以属于多个表,一个主机组可以包含多个主机)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
#
# Authors: limanman
# OsChina: http://my.oschina.net/pydevops/
# Purpose:
#
"""
import pprint
# 创建基类
from sqlalchemy.ext.declarative import declarative_base
# 创建会话类
from sqlalchemy.orm import sessionmaker, aliased, relationship, backref
# 创建表结构
from sqlalchemy import (Text, Integer, String, Table,
Column, create_engine, func, ForeignKey)
Base = declarative_base()
# 主机-2-主机组类
# host_id + group_id 唯一不重复(一个主机可以属于多个主机组),关联另外两个表
host_2_group = Table('host_2_group', Base.metadata,
Column('host_id', Integer, ForeignKey('host.id'),
autoincrement=False, primary_key=True),
Column('group_id', Integer, ForeignKey('group.id'),
autoincrement=False, primary_key=True))
# 主机类
class Host(Base):
__tablename__ = 'host'
id = Column(Integer,
unique=True,
nullable=False,
autoincrement=True,
primary_key=True)
hostname = Column(String(32),
nullable=False)
haddress = Column(String(128),
nullable=False)
hostport = Column(Integer,
nullable=False)
password = Column(String(64),
nullable=True)
auth_key = Column(String(256),
nullable=True)
# User基于host_2_group来关联group表,backref反向引用,则Group基于host_2_group来关联host表
groups = relationship('Group', backref='hosts', secondary=host_2_group)
def __repr__(self):
return ('<Host(id: %s hostname: %s haddress: %s password: %s'
'hostport: %s auth_key: %s)>') % ( self.id, self.hostname,
self.haddress, self.hostport,
self.password, self.auth_key)
# 主机组类
class Group(Base):
__tablename__ = 'group'
id = Column(Integer,
unique=True,
nullable=False,
autoincrement=True,
primary_key=True)
groupname = Column(String(32),
nullable=False)
def __repr__(self):
return '<Group(id: %s groupname: %s)>' % (self.id, self.groupname)
# 创建数据引擎
def create_sql_engine(uname, upass, rhost, rdatabase,
rport=3306, rcharset='utf8'):
"""Create database engine from common use.
Args:
uname : sql user name
upass : sql user pass
rhost : remote sql host
rport : remote sql port
rdatabase: remote sql database
rcharset : remote sql charset, default utf8
Returns:
engine
"""
sql_conn_str = 'mysql+mysqldb://%s:%s@%s:%s/%s?charset=%s' % (uname,
upass,
rhost,
rport,
rdatabase,
rcharset)
sql_engine = create_engine(sql_conn_str, echo=True)
return sql_engine
# 创建会话实例
def create_orm_session(sql_engine):
"""Create database session base sql engine.
Returns:
session
"""
SQLSession = sessionmaker(bind=sql_engine)
sql_session = SQLSession()
return sql_session
# 创建表结构
def create_table_struct(sql_engine):
"""Create table struct above.
Args:
sql_engine: sql engine api
Returns:
None
"""
Base.metadata.create_all(sql_engine)
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加主机组
#xm_newnatserver = Group(groupname='newnatserver')
#xm_statushelper = Group(groupname='statushelper')
#orm_session.add_all([
#xm_newnatserver,
#xm_statushelper
#])
# 添加主机
#group_instances = orm_session.query(Group).all()
#for grp_instance in group_instances:
#for i in xrange(1, 3):
#h_hostname = 'xm-server-%s' % (i)
#cur_host = Host(hostname=h_hostname,
#haddress='127.0.0.1',
#hostport=22,
#password='lm_521314_lz',
#auth_key=None)
## 关联主机和主机组
#cur_host.groups = [grp_instance,]
#orm_session.add(cur_host)
# 查询每个主机组中的主机
for cur_grp in orm_session.query(Group).all():
grp_hosts = cur_grp.hosts
print '%s' % (cur_grp),
for cur_host in grp_hosts:
print '''
%s''' % (cur_host),
# 查询每个主机所属主机组
for cur_host in orm_session.query(Host).all():
usr_group = cur_host.groups
print '%s -> %s' % (cur_host, usr_group)
orm_session.commit()
增删查改:
1.Query对象其实返回的是上面添加到session里面的可迭代的实例对象列表,Query对象的每一步操作依然返回Query对象,体现了它串联强大特性
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
for cur_instance in orm_session.query(User).order_by(User.u_id):
print '''%s
u_id : %s
u_name : %s
u_fullname: %s
u_password: %s''' % (cur_instance, cur_instance.u_id,
cur_instance.u_name,
cur_instance.u_fullname, cur_instance.u_password)
# 增加记录项 - add(instance)/add_all([instance1,...,instancen])
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加多项记录
orm_session.add_all([
User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lm'),
User(u_name='lzz', u_fullname='liuzhen', u_password='lz_521314_lz'),
])
orm_session.commit()
# 删除记录项 - filter+delete删除过滤项
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 删除符合条件的记录
from sqlalchemy import or_
orm_session.query(User).filter(or_(User.u_name == 'lmm',
User.u_name == 'lzz',)).delete()
orm_session.commit()
# 查询指定列 - 返回元组类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 查询指定的列时返回的是元组
for (cur_inst,
u_name,
u_fullname,
u_password) in orm_session.query(User, User.u_name,
User.u_fullname, User.u_password):
print '''%s
u_name : %s
u_fullname: %s
u_password: %s''' % (cur_inst, u_name, u_fullname, u_password)
# 设置列别名 - 返回元组类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 查询指定的列时返回的是元组
for u_name in orm_session.query(User.u_name.label('name')).all():
print u_name
# 设置类别名 - 多次引用起别名
from sqlalchemy.orm import aliased
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 起别名
u = aliased(User, name='u')
# 查询指定的列时返回的是元组
for (u_inst,
u_name,
u_fullname,
u_password) in orm_session.query(u, u.u_name,
u.u_fullname,
u.u_password):
print '''%s
u_name : %s
u_fullname: %s
u_password: %s''' % (u, u_name, u_fullname, u_password)
# 排序限制 - 返回列表类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 结合order_by一起使用,数组分片实现
for cur_instance in orm_session.query(User).order_by(User.u_id)[1:]:
print '''%s
u_name : %s
u_fullname: %s
u_password: %s''' % (cur_instance,
cur_instance.u_name,
cur_instance.u_fullname,
cur_instance.u_password)
# 返回所有行
pprint.pprint(orm_session.query(User).filter(User.u_name.like('lmm%')).all())
# 返回首行
pprint.pprint(orm_session.query(User).filter(User.u_name.like('lmm%')).first())
# 筛选过滤 - filter_by关键词参数过滤/filter支持PY自身操作符,更强大
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 相等测试
pprint.pprint(orm_session.query(User).filter(User.u_name=='lmm').all())
# 不等测试
pprint.pprint(orm_session.query(User).filter(User.u_name!='lmm').all())
# like测试
pprint.pprint(orm_session.query(User).filter(User.u_name.like('lmm%')).all())
# in 测试
pprint.pprint(orm_session.query(User).filter(User.u_name.in_(['lmm1', 'lmm2', 'lmm3'])).all())
# not in 测试
pprint.pprint(orm_session.query(User).filter(~User.u_name.in_(['lmm1', 'lmm2', 'lmm3'])).all())
# is null测试
pprint.pprint(orm_session.query(User).filter(User.u_name == None).all())
# is not null测试
pprint.pprint(orm_session.query(User).filter(User.u_name != None).all())
# and 测试
# 方法一: and_
from sqlalchemy import and_
pprint.pprint(orm_session.query(User).filter(and_(User.u_name=='lmm', User.u_fullname=='limanman')).all())
# 方法二: 再次filter Query对象
pprint.pprint(orm_session.query(User).filter(User.u_name=='lmm').filter(User.u_fullname=='limanman').all())
# or 测试
from sqlalchemy import or_
pprint.pprint(orm_session.query(User).filter(or_(User.u_name=='lmm', User.u_name.in_(['lmm1', 'lmm2', 'lmm3']))).all())
# 原义SQL - SQL查询字符串作为查询参数,还支持绑定参数
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 绑定参数基于字符串的SQL指派,使用冒号来标记代替参数,然后用params方法指定相应的值
pprint.pprint(orm_session.query(User).filter('u_name=:name and u_fullname=:fullname').params(name='lmm', fullname='limanman').all())
# 极端SQL - 直接使用SQL语句脱离orm框架,不建议使用
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 绑定参数基于SQL语句,使用冒号来标记代替参数,然后用params方法指定相应的值
pprint.pprint(orm_session.query(User).from_statement('select * from users where u_name=:name and u_fullname=:fullname').params(name='lmm', fullname='limanman').all())
# SQL计数 - 返回整型类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 返回结果集行数
pprint.pprint(orm_session.query(User).filter(User.u_name.in_(['lmm1', 'lmm2', 'lmm3'])).count())
或
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 返回结果集行数
pprint.pprint(orm_session.query(func.count(User.u_id)).scalar())
# 分组统计 - 返回列表类型
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 按照每个用户分组,统计用户数
pprint.pprint(orm_session.query(User.u_name, func.count(User.u_name).label('u_count')).group_by(User.u_name).all())
# 左右联查 - inner join + group_by
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 添加用户记录
usr_lmm = User(u_name='lmm', u_fullname='limanman', u_password='lm_521314_lm')
# 添加地址对象
usr_lmm.addresses = [
Address(a_email='1784763743@qq.com'),
Address(a_email='xmdevops@vip.qq.com'),
]
# 添加到会话,提交
orm_session.add(usr_lmm)
orm_session.commit()
# inner join - 引用外键表,自动找外键
pprint.pprint(orm_session.query(User).join(User.addresses).all())
# inner join - 手动指定表,自动找外键
pprint.pprint(orm_session.query(User).join(Address).all())
# 连接 - 排序 - 分组
pprint.pprint(orm_session.query(User.u_name, func.count(Address.a_email)).join(Address).filter(User.u_id==Address.u_id).group_by(User.u_name).all())
# 修改记录项 - update更新过滤项
if __name__ == '__main__':
sql_engine = create_sql_engine('root', 'root', '127.0.0.1', 'xmdevops')
create_table_struct(sql_engine)
orm_session = create_orm_session(sql_engine)
# 修改记录项
orm_session.query(User).filter(User.u_name=='lmm').update({
'u_password': 'lz_521314_lz',
})
orm_session.commit()
reference:
https://my.oschina.net/pydevops/blog/664872
发表评论
-
python 类装饰器的四种场景写法
2018-01-29 17:32 646总结: 1类装饰器用于类函数 __get__ ... -
python logging 重复写日志问题
2018-01-23 14:47 466用Python的logging模块记录日志时,遇到了重复记录 ... -
CSS 黑魔法小技巧,让你少写不必要的JS,代码更优雅
2018-01-11 17:19 1090之前不久,由于自己平时涉猎还算广泛,总结了一篇博客: 这些J ... -
Redis 数据类型及应用场景
2018-01-10 17:56 746一、 redis 特点 所有数据存储在内存中,高速读写 ... -
数据可视化的开源方案: Superset vs Redash vs Metabase
2018-01-10 17:29 3350人是视觉动物,要用数 ... -
Nginx 限制单个IP的并发连接数/速度来减缓垃圾蜘蛛爬虫采集
2018-01-10 15:52 1074不知道大家碰没碰到过被一些垃圾蜘蛛爬虫采集造成服务器飙升的情 ... -
docker 升级到最新版
2018-01-10 14:14 827步骤 卸载旧版 docker > rp ... -
PyMongo 常见问题
2018-01-09 16:11 1380这是一篇翻译文章, ... -
Python实战mongodb第3篇: Pymongo的分页查询
2018-01-08 11:25 1854Pymongo的分页查询 -
SQL转 MongoDB语法速查表
2018-01-06 17:34 2234SQL转 MongoDB语法速查表 翻译整理:qqxufo ... -
Logstash由于时区导致8小时时差解决方案
2018-01-03 15:10 5784写在最前面, 这个logstash解决时差的原理是利用ti ... -
Kombu 源码解析一
2017-12-25 16:33 822Kombu 源码解析一 玩 Python 的同学可能很多都 ... -
分布式SQL查询引擎Presto原理介绍
2017-12-21 13:58 1769我们实时引擎组新引入了一款分布式SQL查询引擎,名字叫Pre ... -
一行Python代码实现树结构
2017-12-20 15:47 799树结构是一种抽象数据类型,在计算机科学领域有着非常广泛的应用 ... -
命令行神器 Click
2017-12-20 14:57 639Click Click 是用 Python 写的一个第三方 ... -
django 流式大文件文件下载
2017-12-16 15:20 1886写在最前面: 不要用ajax请求下载文件 ... -
filebeat to elasticsearch针对于filebeat端性能优化--性能提升230%
2017-12-14 17:04 23081. filebeat介绍 filebeat最 ... -
re必杀技正则表达式大全——包括校验数字、字符、一些特殊的需求等等
2017-12-13 15:42 660一、校验数字的表达式 1数字:^[0-9]*$ ... -
n种elasticsearch按照日期定时批量删除索引
2017-12-12 15:30 2670使用elkstack作为 ... -
10个Python面试常问的问题
2017-12-07 10:48 499类继承 有如下的一段代码: clas ...
相关推荐
3. **模块和包**:Python的模块和包机制使得代码复用和组织更加有序,书中会讲解如何导入和使用模块,以及如何创建和组织自己的模块和包。 4. **异常处理**:异常处理是编程中的重要部分,书中通过实例解释了try/...
理解函数、模块和包的概念及其使用。 掌握异常处理和文件操作。 进阶学习: 学习面向对象编程(OOP)的原则和实践。 深入理解Python的高级特性,如迭代器、生成器、装饰器等。 学习Python的标准库,了解常用的模块和...
1. **基础语法**:变量、数据类型(如整型、浮点型、字符串、布尔型、列表、元组、字典、集合)、控制结构(if语句、for循环、while循环、break和continue)、函数定义与调用、模块和包的导入。 2. **对象和类**:...
5. **模块和包**:了解如何导入和使用Python模块,以及如何组织代码为可重用的库。 6. **面向对象编程**:掌握类的定义、对象的创建、继承、封装和多态等概念。 7. **文件操作**:学习如何打开、读取、写入和关闭...
3. **模块和包**:Python的模块和包机制是其代码复用和组织的重要方式。书中会介绍如何编写模块,导入和使用标准库及第三方库,以及如何创建和使用自定义包。 4. **异常处理**:异常处理是程序健壮性的重要部分。书...
9. **模块和包管理**:理解import机制,以及如何使用pip进行包的安装、升级和卸载。 10. **性能优化**:了解Python的性能瓶颈,掌握使用timeit模块进行性能测试,以及使用列表切片、生成器表达式等方法优化代码。 ...
3. **模块和包**:了解如何导入和使用Python的内置模块,以及如何创建和组织自定义模块和包,这对于编写可维护的大型代码至关重要。 4. **异常处理**:学习如何使用try/except语句来捕获和处理程序运行时可能出现的...
3. **模块和包**:了解如何导入和使用Python的内置模块,以及如何创建自定义模块和包,以实现代码的复用和组织。 4. **异常处理**:学习Python的错误和异常处理机制,如何使用try/except/finally来捕获并处理运行时...
5. **模块和包**:理解如何导入和使用外部模块,以及模块和包的组织结构。 6. **字符串操作**:如字符串的拼接、查找、替换、分割等。 7. **范围(Scope)和作用域**:理解全局变量和局部变量的区别,以及`global`...
3. **模块和包**:介绍如何编写、导入和管理Python模块,以及如何组织代码为包,提高代码的复用性和可维护性。 4. **标准库的使用**:涵盖Python标准库中的部分常用模块,如os、sys、math、datetime等,展示它们在...
9. **模块和包**:理解Python的模块化编程,如何创建和组织自己的代码,以及使用包来管理大型项目。 10. **高级话题**:涵盖生成器、上下文管理器、装饰器等Python的高级特性,以及如何编写元类和自定义迭代器。 ...
3. **模块和包**:讲解如何组织代码,导入和创建模块,以及如何使用包管理代码库。 4. **面向对象编程**:介绍类和对象的概念,继承、封装和多态性,以及Python中的特殊方法(如`__init__`和`__del__`)。 5. **...
1. Python基础:包括变量、数据类型、控制流语句、函数定义、模块和包的使用等。 2. 高级Python:如装饰器、生成器、上下文管理器、元类等。 3. 数据结构:列表、元组、字典、集合、堆栈和队列等的应用。 4. 文件...
Python库通常包含一系列模块和包,模块是包含Python定义和语句的文件,而包则是一种组织模块的方式,通过使用点号(.)进行命名,例如`my_package.module1`。在`tractor`库中,可能包含各种类、函数和数据结构,供...
1. **Python基础**:这是所有Python学习者必经之路,包括变量、数据类型(如整型、浮点型、字符串、列表、元组、字典)、控制流(if-else、for、while循环)、函数定义和调用、模块和包的使用。 2. **面向对象编程*...
4. **模块与包**:掌握Python的模块化编程,如何导入和使用标准库及第三方库,以及如何创建自己的模块和包。 5. **标准库与第三方库**:学习Python标准库中的常用模块,如os、sys、datetime、random等,并了解如何...
- **模块与包**: 解释模块和包的概念,以及如何组织代码为模块和包,实现代码重用。 **3. 面向对象编程在Python中的应用** - **类与对象**: 定义类的基本语法,创建对象的过程,以及如何利用类来封装数据和行为。 ...
这部分可能会讲述如何组织代码为模块和包,导入规则,以及使用`__init__.py`文件。 第9部分可能涉及的是Python的高级特性和实战应用,比如: 1. **并发与并行**:Python提供了多线程、多进程以及异步I/O等实现并发...
10. **模块和包**:讲解Python的模块系统,包括导入模块,创建和使用自定义模块,以及了解标准库和第三方库的使用。 11. **数据库编程**:简要介绍如何使用Python连接和操作数据库,如SQLAlchemy或sqlite3。 12. *...