`

Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

阅读更多

       一、 Memcached

  1. Memcached安装和基本使用
  2. Python操作Memcached
    2.1 set
    2.2 集群操作
    2.3 add
    2.4 replace
    2.5 set 和 set_multi
    2.6 delete 和 delete_multi
    2.7 get 和 get_multi
    2.8 append 和 prepend
    2.9 decr 和 incr  
    2.10 gets 和 cas

二、redis

  1. 安装Redis
  2. 安装Python的redis模块
  3. redis模块介绍
    3.1 操作模式
    3.2 连接池
    3.3 数据操作
    3.3.1 通用操作
    3.3.2 String操作
    3.3.3 Hash操作
    3.3.4 List操作
    3.3.5 Set操作
    3.3.6 有序集合的操作
    3.3.7 管道
    3.3.8 发布订阅

三、rabbitMQ

3.1 RabbitMQ的安装
3.2 rabbitMQ工作机制
3.3 简单的rabbitMQ使用
3.4 发布订阅
3.5 路由/关键字
3.6 主题

四、SQLAlchemy

一、 Memcached

Memcached 是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载。它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态、数据库驱动网站的速度。Memcached基于一个存储键/值对的hashmap。其守护进程(daemon )是用C写的,但是客户端可以用任何语言来编写,并通过memcached协议与守护进程通信。

1.1 Memcached安装和基本使用

  • 安装Memcached:

    用wget 去http://memcached.org下载最新源码
    tar -zxvf memcached-x.x.x.tar.gz
    cd memcached-x.x.x
    ./configure && make && make test && sudo make install
    PS:依赖libevent,需要提前安装
    yum install libevent-devel
    apt-get install libevent-dev

  • 启动Memcached

    memcached -d -m 10 -u root -l 10.211.55.4 -p 12000 -c 256 -P /tmp/memcached.pid

    PS:如果你是在本机的虚拟机内测试,请将IP改为0.0.0.0
  • 参数说明:

    -d:启动一个守护进程
    -m:分配给Memcache使用的内存数量,单位是MB
    -u :运行Memcache的用户
    -l :监听的服务器IP地址
    -p:设置Memcache监听的端口,最好是1024以上的端口
    -c :最大运行的并发连接数,默认是1024,按照服务器的负载量来设定
    -P:设置保存Memcache的pid文件

  • Memcached命令

    存储命令: set/add/replace/append/prepend/cas
    获取命令: get/gets
    其他命令: delete/stats..

1.2 Python操作Memcached

安装API

1.2.1 set

set是最基本的操作,传入两个参数,第一个是name,第二个是这个name对应的value。

import memcache
 
mc = memcache.Client(['10.211.55.4:12000'], debug=True)
mc.set("foo", "bar")
ret = mc.get('foo')
print(ret)

debug = True 表示运行出现错误时,显示错误信息,上线后请移除该参数。

1.2.2 集群操作

python-memcached模块原生支持集群操作,其原理是在内存维护一个主机列表,且集群中主机的权重值和主机在列表中重复出现的次数成正比。

主机 权重
1.1.1.1 1
1.1.1.2 2
1.1.1.3 1

那么在内存中主机列表为:host_list = ["1.1.1.1", "1.1.1.2", "1.1.1.2", "1.1.1.3", ]
用户如果要在内存中创建一个键值对(如:k1 = "v1"),那么要执行以下步骤:

  1. 根据算法将 k1 转换成一个数字
  2. 将数字和主机列表长度求余数,得到一个值 N( 0 <= N < 列表长度 )
  3. 在主机列表中根据 第2步得到的值为索引获取主机,例如:host_list[N]
  4. 连接 将第3步中获取的主机,将 k1 = "v1" 放置在该服务器的内存中

代码如下:

mc = memcache.Client([('1.1.1.1:12000', 1), ('1.1.1.2:12000', 2), ('1.1.1.3:12000', 1)], debug=True)
 mc.set('k1', 'v1')
1.2.3 add

添加一条键值对。如果已经存在该 key,则弹出异常。

mc.add('k1', 'v1')
mc.add('k1', 'v2') # 报错,对已存在的key重复添加!!!
1.2.4 replace

修改某个key的值,如果key不存在,则异常.

mc.replace('kkkk','999')
1.2.5 set 和 set_multi

set: 设置一个键值对,如果key不存在,则创建,如果key存在,则修改
set_multi: 设置多个键值对,如果key不存在,则创建,如果key存在,则修改

mc.set('key0', 'jack') 
mc.set_multi({'key1': 'val1', 'key2': 'val2'})
1.2.6 delete 和 delete_multi

delete: 删除指定的一个键值对
delete_multi: 删除指定的多个键值对

mc.delete('key0')
mc.delete_multi(['key1', 'key2'])
1.2.7 get 和 get_multi

get: 获取一个键值对
get_multi : 获取多一个键值对

val = mc.get('key0')
item_dict = mc.get_multi(["key1", "key2", "key3"])
1.2.8 append 和 prepend

append : 修改指定key的值,在原来的值后面追加内容
prepend 修改指定key的值,在原来的值前面插入内容

# k1 = "v1" 
mc.append('k1', 'after')
# k1 = "v1after" 
mc.prepend('k1', 'before')
# k1 = "beforev1after"
1.2.9 decr 和 incr  

incr : 自增,将值增加 N ( N默认为1 )
decr :自减,将值减少 N ( N默认为1 )

mc.set('k1', '777') 
mc.incr('k1')
# k1 = 778 
mc.incr('k1', 10)
# k1 = 788 
mc.decr('k1')
# k1 = 787 
mc.decr('k1', 10)
# k1 = 777
1.2.10 gets 和 cas

使用缓存系统共享数据资源就必然绕不开数据争夺和脏数据的问题。举个例子:
假设商城某件商品的剩余个数保存在memcache中,product_count = 900
A用户刷新页面从memcache中读取到product_count = 900
B用户刷新页面从memcache中读取到product_count = 900

A、B用户均购买商品,并修改product_count的值:

A用户修改后 product_count=899
B用户修改后 product_count=899
如此一来缓存内的数据便不再正确,实际此时product_count应该等于898.
如果使用python的set和get来操作以上过程,那么程序就会如上述所示情况!

如果想要避免此情况的发生,需要使用 gets 和 cas ,如:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
mc = memcache.Client(['10.211.55.4:12000'], debug=True, cache_cas=True)
 
v = mc.gets('product_count')
# ...
# 如果有人在gets之后和cas之前修改了product_count,那么,下面的设置将会执行失败,剖出异常,从而避免非正常数据的产生
mc.cas('product_count', "899")

本质上每次执行gets时,会从memcache中获取一个自增的数字,通过cas去修改gets的值时,会携带之前获取的自增值和memcache中的自增值进行比较,如果相等,则可以提交;如果不相等,那表示在gets和cas执行之间,又有其他人执行了gets,则不允许修改。

二、redis

Redis是什么?先看其官方介绍:

Redis is an open source (BSD licensed), in-memory data structure store, used as database, cache and message broker. It supports data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, hyperloglogs and geospatial indexes with radius queries. Redis has built-in replication, Lua scripting, LRU eviction, transactions and different levels of on-disk persistence, and provides high availability via Redis Sentinel and automatic partitioning with Redis Cluster.

巴拉巴拉一堆......
Redis是一个驻扎在内存中的数据存储结构,常用于数据库、缓存和消息代理。它采用先进的 key-value 存储方式,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。
Redis的官方网站是:http://www.redis.io/

Redis 与其他同类软件相比有三个不同的特点:

  • Redis是完全在内存中保存数据的数据库,使用磁盘只是为了持久性目的;
  • Redis相比许多键值数据存储系统有相对丰富的数据类型;
  • Redis可以将数据复制到任意数量的从服务器中;

Redis有以下方面的优点:

  • 异常快速 : Redis是非常快的,每秒可以执行大约110000设置操作,81000个/每秒的读取操作。
  • 支持丰富的数据类型 : Redis支持最大多数开发人员已经知道如列表,集合,可排序集合,哈希等数据类型。
    *操作都是原子的 : 所有 Redis 的操作都是原子,从而确保当两个客户同时访问 Redis 服务器得到的是更新后的值(最新值)。
  • MultiUtility工具:Redis是一个多功能实用工具,可以在很多如:缓存,消息传递队列中使用(Redis原生支持发布/订阅)。

2.1 安装Redis

在 Ubuntu 上安装 Redis:

$sudo apt-get update

$sudo apt-get install redis-server

启动 Redis

$redis-server

查看 redis 是否在运行

$redis-cli

这将打开一个 Redis 提示符,如下所示:

redis 127.0.0.1:6379>

输入 PING 命令

redis 127.0.0.1:6379> ping
PONG

显示上面的内容则说明你已经成功地安装了 Redis。
PS:如果你是在本机的虚拟机内运行redis服务,请在/etc/redis.conf内将bind IP改为0.0.0.0

2.2 安装Python的redis模块

有多种方式可以安装redis模块:

sudo pip3 install redis
or
sudo easy_install redis
or
源码安装
详见:https://github.com/WoLpH/redis-py

redis模块的使用可以分类为:

  • 连接方式
  • 连接池
  • 操作
    • String 操作
    • Hash 操作
    • List 操作
    • Set 操作
    • Sort Set 操作
  • 管道
  • 发布订阅

2.3 redis模块介绍

2.3.1 操作模式

redis提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。

#!/usr/bin/env python
# -*- coding:utf-8 -*- 
import redis
 
r = redis.Redis(host='10.211.55.4', port=6379)
r.set('foo', 'Bar')
print(r.get('foo'))

PS:请将10.211.55.4更换成你所在的redis服务器地址

2.3.2 连接池

redis使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认情况下,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数实例化Redis对象,这样就可以实现多个Redis实例共享一个连接池。

import redis
 
pool = redis.ConnectionPool(host='10.211.55.4', port=6379)
 
r = redis.Redis(connection_pool=pool)
r.set('foo', 'Bar')
print()r.get('foo'))
2.3.3 数据操作
2.3.3.1 通用操作
  • delete(*names)
    删除redis中的任意数据类型

  • exists(name)
    检测redis的name是否存在

  • keys(pattern='*')
    根据正则模式获取redis的name

    KEYS * 匹配数据库中所有 key 。
    KEYS h?llo 匹配 hello , hallo 和 hxllo 等。
    KEYS h*llo 匹配 hllo 和 heeeeello 等。
    KEYS h[ae]llo 匹配 hello 和 hallo ,但不匹配 hillo

  • expire(name ,time)
    为某个name设置超时时间

  • rename(src, dst)
    对redis的name重命名

  • move(name, db))
    将redis的某个值移动到指定的db下

  • randomkey()
    随机获取一个redis的name(不删除)

  • type(name)
    获取name对应值的类型

  • scan(cursor=0, match=None, count=None)
  • scan_iter(match=None, count=None)
    用于增量迭代获取key

2.3.3.2 String操作

redis中的String在内存中按照一个name对应一个value来存储。

  • set(name, value, ex=None, px=None, nx=False, xx=False)
    在Redis中设置值,不存在则创建,存在则修改

    ex,过期时间(秒)
    px,过期时间(毫秒)
    nx,如果设置为True,则只有name不存在时,当前set操作才执行
    xx,如果设置为True,则只有name存在时,岗前set操作才执行

  • setnx(name, value)
    设置值

    只有name不存在时,执行设置操作(添加)

  • setex(name, value, time)
    设置值

    time,过期时间(数字秒 或 timedelta对象)

  • psetex(name, time_ms, value)
    设置值

    time_ms,过期时间(数字毫秒 或 timedelta对象)

  • mset(*args, **kwargs)
    批量设置值

    mset(k1='v1', k2='v2')
    mget({'k1': 'v1', 'k2': 'v2'})

  • get(name)
    获取值
  • mget(keys, *args)
    批量获取

    mget('ylr', 'jack')

    mget(['ylr', 'jack'])

  • getset(name, value)
    设置新值并获取原来的值

  • getrange(key, start, end)
    获取子序列(根据字节获取,非字符)

    key,Redis 中字符串的 name
    start,起始位置(字节)
    end,结束位置(字节)
    如: "张三丰" ,0-3表示 "张"

  • setrange(name, offset, value)
    修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加)

    offset,字符串的索引,字节(一个汉字三个字节)
    value,要设置的值

  • setbit(name, offset, value)
    对name对应值的二进制表示的位进行操作

    name,redis的name
    offset,位的索引(将值变换成二进制后再进行索引)
    value,值只能是 1 或 0
    如果在Redis中有一个对应: n1 = "foo",
    那么字符串foo的二进制表示为:01100110 01101111 01101111
    如果执行 setbit('n1', 7, 1),则就会将第7位设置为1,
    那么最终二进制则变成 01100111 01101111 01101111,即:"goo"

  • getbit(name, offset)
    获取name对应值的二进制表示中的指定位的值 (0或1)
  • bitcount(key, start=None, end=None)
    获取name对应的值的二进制表示中 1 的个数

    key,Redis的name
    start,位起始位置
    end,位结束位置

  • bitop(operation, dest, *keys)
    获取多个值,并将值做位运算,将最后的结果保存至新的name对应的值

    operation,AND(并) ,OR(或) ,NOT(非) , XOR(异或)
    dest, 新的Redis的name
    *keys,要查找的Redis的name
    如:
    bitop("AND", 'new_name', 'n1', 'n2', 'n3')
    获取Redis中n1,n2,n3对应的值,然后将所有的值做位运算,然后将结果保存 new_name 对应的值中。

  • strlen(name)
    返回name对应值的字节长度(一个汉字3个字节)
  • incr(name, amount=1)
    自增 name对应的值,当name不存在时,则创建name=amount。

    name,Redis的name
    amount,自增数(必须是整数)
    同incrby

  • incrbyfloat(name, amount=1.0)
    自增 name对应的值,当name不存在时,则创建name=amount。

    name,Redis的name
    amount,自增数(浮点型)

  • decr(name, amount=1)
    自减 name对应的值,当name不存在时,则创建name=amount,否则,则自减。

    name,Redis的name
    amount,自减数(整数)

  • append(key, value)
    在name对应的值后面追加内容

    key, redis的name
    value, 要追加的字符串

2.3.3.3 Hash操作

redis中Hash在内存中的存储格式如下图:

  • hset(name, key, value)
    name对应的hash中设置一个键值对(不存在,则创建;否则,修改)

    key,name对应的hash中的key
    value,name对应的hash中的value

  • hsetnx(name, key, value)
    当name对应的hash中不存在当前key时,则创建(相当于添加)

  • hmset(name, mapping)
    在name对应的hash中批量设置键值对

    mapping,字典,如:{'k1':'v1', 'k2': 'v2'}
    r.hmset('xx', {'k1':'v1', 'k2': 'v2'})

  • hget(name,key)
    在name对应的hash中获取根据key获取value

  • hmget(name, keys, *args)

    在name对应的hash中获取多个key的值
    *args,要获取的key列表,如:['k1', 'k2', 'k3']
    keys,要获取的key,如:k1,k2,k3
    r.mget('xx', ['k1', 'k2'])
    print r.hmget('xx', 'k1', 'k2')

  • hgetall(name)
    获取name对应hash的所有键值

  • hlen(name)
    获取name对应的hash中键值对的个数

  • hkeys(name)
    获取name对应的hash中所有的key的值

  • hvals(name)
    获取name对应的hash中所有的value的值

  • hexists(name, key)
    检查name对应的hash是否存在当前传入的key

  • hdel(name,*keys)
    将name对应的hash中指定key的键值对删除

  • hincrby(name, key, amount=1)
    自增name对应的hash中的指定key的值,不存在则创建key=amount

  • hincrbyfloat(name, key, amount=1.0)
    自增name对应的hash中的指定key的值,不存在则创建key=amount

  • hscan(name, cursor=0, match=None, count=None)
    增量式迭代获取,对于大的数据非常有用,hscan可以实现分片的获取数据,并非一次性将数据全部获取完,从而防止内存被撑爆。

    cursor,游标(基于游标分批取获取数据)
    match,匹配指定key,默认None 表示所有的key
    count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
    例如:
    第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
    第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
    ...
    直到返回值cursor的值为0时,表示数据已经通过分片获取完毕

  • hscan_iter(name, match=None, count=None)
    利用yield封装hscan创建生成器,实现分批去redis中获取数据

    match,匹配指定key,默认None 表示所有的key
    count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
    如:
    for item in r.hscan_iter('xx'):
    print item

2.3.3.4 List操作

redis中的List在内存中按照一个name对应一个List来存储。如图:

  • lpush(name,values)
    在name对应的list中添加元素,每个新的元素都添加到列表的最左边

    r.lpush('oo', 11,22,33)
    保存顺序为: 33,22,11

  • rpush(name, values)
    表示从右向左操作

  • lpushx(name,value)
    在name对应的list中添加元素,只有name已经存在时,值添加到列表的最左边

  • rpushx(name, value) 表示从右向左操作

  • llen(name)
    name对应的list元素的个数

  • linsert(name, where, refvalue, value))
    在name对应的列表的某一个值前或后插入一个新值

    where,BEFORE或AFTER
    refvalue,标杆值,即:在它前后插入数据
    value,要插入的数据

  • r.lset(name, index, value)
    对name对应的list中的某个索引位置重新赋值

    index,list的索引位置
    value,要设置的值

  • r.lrem(name, value, num)
    在name对应的list中删除指定的值

    value,要删除的值
    num, num=0,删除列表中所有的指定值;
    num=2,从前到后,删除2个;
    num=-2,从后向前,删除2个

  • lpop(name)
    在name对应的列表的左侧获取第一个元素并从列表中移除

  • rpop(name) 表示从右向左操作

  • lindex(name, index)
    在name对应的列表中根据索引获取列表元素

  • lrange(name, start, end)
    在name对应的列表分片获取数据

    start,索引的起始位置
    end,索引结束位置

  • ltrim(name, start, end)
    在name对应的列表中移除没有在start-end索引之间的值

  • rpoplpush(src, dst)
    从一个列表取出最右边的元素,同时将其添加至另一个列表的最左边

    src,要取数据的列表的name
    dst,要添加数据的列表的name

  • blpop(keys, timeout)
    将多个列表排列,按照从左到右去pop对应列表的元素

    keys,redis的name的集合
    timeout,超时时间,当元素所有列表的元素获取完之后,阻塞等待列表内有数据的时间(秒), 0 表示永远阻塞

  • brpop(keys, timeout)
    从右向左获取数据

  • brpoplpush(src, dst, timeout=0)
    从一个列表的右侧移除一个元素并将其添加到另一个列表的左侧

    src,取出并要移除元素的列表对应的name
    dst,要插入元素的列表对应的name
    timeout,当src对应的列表中没有数据时,阻塞等待其有数据的超时时间(秒),0 表示永远阻塞

  • 自定义的增量迭代
    由于redis类库中没有提供对列表元素的增量迭代,如果想要循环name对应的列表的所有元素,那么就需要获取name对应的所有列表并循环每个列表。但是,如果列表非常大,那么就有可能在第一步时就将内存撑爆,所以有必要自定义一个增量迭代的功能:

def list_iter(name):
    """    自定义redis列表增量迭代    :param name: redis中的name,即:迭代name对应的列表    :return: yield 返回 列表元素    """
    list_count = r.llen(name)
    for index in xrange(list_count):
        yield r.lindex(name, index)
 
# 使用
for item in list_iter('pp'):
    print(item)
2.3.3.5 Set操作

Set集合就是不允许重复的列表,其在redis的存储方式和列表一样。

  • sadd(name,values)
    name对应的集合中添加元素

  • scard(name)
    获取name对应的集合中元素个数

  • sdiff(keys, *args)
    获取多个集合的不同之处

  • sdiffstore(dest, keys, *args)
    分析 多个集合的不同之处,再将其加入到dest对应的集合中

  • sinter(keys, *args)
    获取多个name对应集合的并集

  • sinterstore(dest, keys, *args)
    获取多个name对应集合的并集,再将其加入到dest对应的集合中

  • sismember(name, value)
    检查value是否是name对应的集合的成员

  • smembers(name)
    获取name对应的集合的所有成员

  • smove(src, dst, value)
    将某个成员从一个集合中移动到另外一个集合

  • spop(name)
    从集合的右侧(尾部)移除一个成员,并将其返回

  • srandmember(name, numbers)
    从name对应的集合中随机获取 numbers 个元素

  • srem(name, values)
    在name对应的集合中删除某些值

  • sunion(keys, *args)
    获取多一个name对应的集合的并集

  • sunionstore(dest,keys, *args)
    获取多个name对应的集合的并集,并将结果保存到dest对应的集合中

  • sscan(name, cursor=0, match=None, count=None)
  • sscan_iter(name, match=None, count=None)
    同字符串的操作,用于增量迭代分批获取元素,避免内存消耗太大

2.3.3.6 有序集合的操作

在集合的基础上,为每个元素排序;元素的排序需要根据另外一个值来进行比较,所以,对于有序集合,每一个元素有两个值,即:值和分数,分数专门用来做排序。

  • zadd(name, *args, **kwargs)
    在name对应的有序集合中添加元素

    zadd('zz', 'n1', 1, 'n2', 2)
    zadd('zz', n1=11, n2=22)

  • zcard(name)
    获取name对应的有序集合元素的数量

  • zcount(name, min, max)
    获取name对应的有序集合中分数 在 [min,max] 之间的个数

  • zincrby(name, value, amount)
    自增name对应的有序集合的 name 对应的分数

  • r.zrange( name, start, end, desc=False, withscores=False, score_cast_func=float)
    按照索引范围获取name对应的有序集合的元素

    start,有序集合索引起始位置(非分数)
    end,有序集合索引结束位置(非分数)
    desc,排序规则,默认按照分数从小到大排序
    withscores,是否获取元素的分数,默认只获取元素的值
    score_cast_func,对分数进行数据转换的函数

  • zrevrange(name, start, end, withscores=False, score_cast_func=float)
    按照分数范围获取name对应的有序集合的元素
  • zrangebyscore(name, min, max, start=None, num=None, withscores=False, score_cast_func=float)
    从大到小排序
  • zrevrangebyscore(name, max, min, start=None, num=None, withscores=False, score_cast_func=float)

  • zrank(name, value)
    获取某个值在 name对应的有序集合中的排行(从 0 开始)
  • zrevrank(name, value),从大到小排序
  • zrangebylex(name, min, max, start=None, num=None)
    当有序集合的所有成员都具有相同的分值时,有序集合的元素会根据成员的 值 (lexicographical ordering)来进行排序,而这个命令则可以返回给定的有序集合键 key 中, 元素的值介于 min 和 max 之间的成员
    对集合中的每个成员进行逐个字节的对比(byte-by-byte compare), 并按照从低到高的顺序, 返回排序后的集合成员。 如果两个字符串有一部分内容是相同的话, 那么命令会认为较长的字符串比较短的字符串要大

    min,左区间(值)。 + 表示正无限; - 表示负无限; ( 表示开区间; [ 则表示闭区间
    min,右区间(值)
    start,对结果进行分片处理,索引位置
    num,对结果进行分片处理,索引后面的num个元素
    ZADD myzset 0 aa 0 ba 0 ca 0 da 0 ea 0 fa 0 ga
    r.zrangebylex('myzset', "-", "[ca") 结果为:['aa', 'ba', 'ca']

  • zrevrangebylex(name, max, min, start=None, num=None)
  • zrem(name, values)
    删除name对应的有序集合中值是values的成员

  • zremrangebyrank(name, min, max)
    根据排行范围删除

  • zremrangebyscore(name, min, max)
    根据分数范围删除

  • zremrangebylex(name, min, max)
    根据值返回删除

  • zscore(name, value)
    获取name对应有序集合中 value 对应的分数

  • zinterstore(dest, keys, aggregate=None)
    获取两个有序集合的交集,如果遇到相同值不同分数,则按照aggregate进行操作,aggregate的值为: SUM MIN MAX

  • zunionstore(dest, keys, aggregate=None)
    获取两个有序集合的并集,如果遇到相同值不同分数,则按照aggregate进行操作

  • zscan(name, cursor=0, match=None, count=None, score_cast_func=float)
  • zscan_iter(name, match=None, count=None,score_cast_func=float)
    同字符串相似,相较于字符串新增score_cast_func,用来对分数进行操作

2.3.3.7 管道

redis默认在执行每次请求时都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,可以使用pipline,并且默认情况下一次pipline 是原子性操作。

import redis
 
pool = redis.ConnectionPool(host='10.211.55.4', port=6379)
 
r = redis.Redis(connection_pool=pool) 
# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True) 
r.set('name', 'alex')
r.set('role', 'teacher') 
pipe.execute()
2.3.3.8 发布订阅

redis具有发布订阅功能。下面是一个简单案例:

发布者:服务器
订阅者:Dashboad和数据处理

  • 发布类源码
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import redis

class RedisHelper:

    def__init__(self):
        self.__conn = redis.Redis(host='10.211.55.4')
        self.chan_sub = 'fm104.5'
        self.chan_pub = 'fm104.5'

    def public(self, msg):
        self.__conn.publish(self.chan_pub, msg)
        return True

    def subscribe(self):
        pub = self.__conn.pubsub()
        pub.subscribe(self.chan_sub)
        pub.parse_response()
        return pub
  • 订阅者:
from monitor.RedisHelper import RedisHelper
 
obj = RedisHelper()
redis_sub = obj.subscribe() 
while True:
    msg= redis_sub.parse_response()
    print(msg)
  • 发布者:
from monitor.RedisHelper import RedisHelper
 
obj = RedisHelper()
obj.public('hello')

更多redis的资料请参考:
https://github.com/andymccurdy/redis-py/
http://doc.redisfans.com/

三、rabbitMQ

RabbitMQ官网

RabbitMQ是一个由erlang开发的基于AMQP(Advanced Message Queue )协议的开源实现。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面都非常的优秀。是当前最主流的消息中间件之一。
    
AMQP,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,同样,消息使用者也不用知道发送者的存在。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

3.1 RabbitMQ的安装

安装配置epel源
$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
安装erlang
$ yum -y install erlang

安装RabbitMQ

$ yum -y install rabbitmq-server

注意:service rabbitmq-server start/stop

安装python中的rabbitMQ模块,名称为pika

pip install pika
or
easy_install pika
or
源码
https://pypi.python.org/pypi/pika

3.2 rabbitMQ工作机制

rabbitMQ主要有下面六种工作模式

上图中,深蓝色的X就是Exchange,红色的是Queue ,这两者都在 Server 端,又称作 Broker ,由rabbitMQ负责维护。左边的P是生产者,右边的C是消费者,它们通常由应用端自己创建,可以使用任何编程语言。
名词解释:

  • P: Producer,数据的发送方。
  • C:Consumer,数据的接收方。
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
  • channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

rabbitMQ的使用流程概括如下:

  1. 生产者连接到消息队列服务器,打开一个channel。
  2. 生产者声明一个exchange,并设置相关属性。
  3. 生产者声明一个queue,并设置相关属性。
  4. 生产者使用routing key,在exchange和queue之间建立好绑定关系。
  5. 生产者投递消息到exchange。exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
  6. 消费者的流程基本上前四步与生产者一样,只是最后通过消息队列获取消息。

3.3 简单的rabbitMQ使用

先看一个基于rabbitMQ的生产者消费者模型,熟悉一下。
生产者:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

消费者:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
 
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

参数:acknowledgment 消息不丢失

no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。主要是在消费者写这么一句:ch.basic_ack(delivery_tag = method.delivery_tag)

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='10.211.55.4'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

参数:durable 消息不丢失
上面那个是防止在消费者一端的数据丢失,而这个是防止在生产者端的丢失。通过持久化,可以保证数据的安全。主要是在生产者和消费者各自申明channel.queue_declare(queue='hello', durable=True)和生产者properties=pika.BasicProperties(delivery_mode=2,)就行了。

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
# make message persistent
channel.queue_declare(queue='hello', durable=True)
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2, # make message persistent
                      ))
print(" [x] Sent 'Hello World!'")
connection.close()

关于消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数序列的任务,消费者2去队列中获取偶数序列的任务。
channel.basic_qos(prefetch_count=1) 则设置为谁来谁取,不再按照奇偶数排列。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)
# 关键在这里
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='hello',
                      no_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

以上都是简单的没有使用exchange的案例。
exchange在rabbitMQ中的作用可以简单的理解为网络中的路由机或者分发器。它有三种模式:

  • Publish/Subscribe:发布/订阅,参数:fanout
  • Routing:路由/关键字发送,参数:direct
  • Topic:主题,参数:topic

参数在声明exchange时由type指定。下面逐一介绍。

3.4 发布订阅

exchange type = fanout。
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

发布者:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

订阅者:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

3.5 路由/关键字

exchange type = direct
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,将数据绑定关键字发送到exchange,exchange根据关键字判定应该将数据发送至指定队列。

生产者:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

消费者:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

3.6 主题

exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将routing_key和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • * 表示只能匹配 一个 单词
发送者关键字           队列中路由值         结果
old.boy.python          old.*                       -- 不匹配
old.boy.python          old.#                       -- 匹配

生产者:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

消费者:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

四、SQLAlchemy

*SQLAlchemy是一个软件、一个模块,没有图形界面没有友好的选项,它只有自己一套独立的框架,有自己的定义方式和调用方式。不要指望它是标准化的,拿来就能用,你要使用它,就要从头学习它的各种功能和用法。它有很多独有的代码写法,而且是写死的,不要问为什么这样写,为什么是这个方法,为什么要定义这个类,没有那么多为什么,要么背下来,要么查手册,要么你根本不知道如何用它!我讨厌这种不标准的东西,学习成本太高,过两天它被淘汰了,换了一个又基本要从头学起!*

SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

SQLAlchemy本身无法操作数据库,依赖pymsql等第三方插件。它的Dialect部分就是用于和数据API进行交流的,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
  
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
  
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
  
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
  
更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html

在Dialect这里,确定你要使用什么API,这是标准,是写死的东西,不能改!
在SQLAlchemy的底层,它其实本质上是使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。这和直接使用pymysql模块没两样。真正体现SQLAlchemy威力的是ORM框架,联合使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,mysql服务器执行SQL,返回结果给SQLAlchemy,结果又被转换成对象,通过调用对象属性获取最终的值。
貌似从流程上来看,这套机制比直接使用原生SQL语句要复杂N倍,效率也要低很多,但它有很多好处是非常重要的:

  • python程序员不用是DBA
  • 不用直接写原生的sql语句
  • SQLAlchemy帮你自动优化sql语句
  • python代码和数据库sql语句解耦
  • 不必关心背后使用的是mysql还是oracle等数据库的区别
  • 最关键的:它符合程序员的懒人法则,封装一切代码和操作

在实际使用中,它有哪些缺点呢?

  • 需要记忆一堆的包、模块、类名、方法,和学门新语言差不多
  • 需要记住一堆固定的用法和流程
  • 需要巴拉巴拉记忆一坨坨规则

所以,如果你在实际工作中没有涉及这一块,那么忘了SQLAlchemy吧,不要让你有限的脑容量来记忆一些不常用的东西。等到你真需要用了,再学不迟。你唯一需要记住的就是有SQLAlchemy这么一个东西!
1、创建表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)

Base = declarative_base()

# 创建单表
class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    extra = Column(String(16))

    __table_args__ = (
    UniqueConstraint('id', 'name', name='uix_id_name'),
        Index('ix_id_name', 'name', 'extra'),
    )


# 一对多
class Favor(Base):
    __tablename__ = 'favor'
    nid = Column(Integer, primary_key=True)
    caption = Column(String(50), default='red', unique=True)


class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    favor_id = Column(Integer, ForeignKey("favor.nid"))


# 多对多
class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)
    port = Column(Integer, default=22)


class Server(Base):
    __tablename__ = 'server'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)


class ServerToGroup(Base):
    __tablename__ = 'servertogroup'
    nid = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))


def init_db():
    Base.metadata.create_all(engine)


def drop_db():
    Base.metadata.drop_all(engine)

2、操作表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)

Base = declarative_base()

# 创建单表
class Users(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    extra = Column(String(16))

    __table_args__ = (
    UniqueConstraint('id', 'name', name='uix_id_name'),
        Index('ix_id_name', 'name', 'extra'),
    )

    def__repr__(self):
        return "%s-%s" %(self.id, self.name)

# 一对多
class Favor(Base):
    __tablename__ = 'favor'
    nid = Column(Integer, primary_key=True)
    caption = Column(String(50), default='red', unique=True)

    def__repr__(self):
        return "%s-%s" %(self.nid, self.caption)

class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    favor_id = Column(Integer, ForeignKey("favor.nid"))
    # 与生成表结构无关,仅用于查询方便
    favor = relationship("Favor", backref='pers')

# 多对多
class ServerToGroup(Base):
    __tablename__ = 'servertogroup'
    nid = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey('server.id'))
    group_id = Column(Integer, ForeignKey('group.id'))
    group = relationship("Group", backref='s2g')
    server = relationship("Server", backref='s2g')

class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)
    port = Column(Integer, default=22)
    # group = relationship('Group',secondary=ServerToGroup,backref='host_list')


class Server(Base):
    __tablename__ = 'server'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)

def init_db():
    Base.metadata.create_all(engine)

def drop_db():
    Base.metadata.drop_all(engine)


Session = sessionmaker(bind=engine)
session = Session()

obj = Users(name="alex0", extra='sb')
session.add(obj)
session.add_all([
    Users(name="alex1", extra='sb'),
    Users(name="alex2", extra='sb'),
])
session.commit()

session.query(Users).filter(Users.id > 2).delete()
session.commit()

session.query(Users).filter(Users.id > 2).update({"name" : "099"})
session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
session.commit()

ret = session.query(Users).all()
ret = session.query(Users.name, Users.extra).all()
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter_by(name='alex').first()

其它操作

# 条件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()


# 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 限制
ret = session.query(Users)[1:2]

# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分组
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 连表

ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()

ret = session.query(Person).join(Favor).all()

ret = session.query(Person).join(Favor, isouter=True).all()


# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()

下面贴一个大神写的基于SQLAlchemy和paramiko的堡垒机demo
堡垒机功能实现需求
业务需求:

  • 兼顾业务安全目标与用户体验,堡垒机部署后,不应使用户访问业务系统的访问变的复杂,否则工作将很难推进,因为没人喜欢改变现状,尤其是改变后生活变得更艰难
  • 保证堡垒机稳定安全运行, 没有100%的把握,不要上线任何新系统,即使有100%把握,也要做好最坏的打算,想好故障预案

功能需求:

  • 所有的用户操作日志要保留在数据库中
  • 每个用户登录堡垒机后,只需要选择具体要访问的设置,就连接上了,不需要再输入目标机器的访问密码
  • 允许用户对不同的目标设备有不同的访问权限,例:
  • 对10.0.2.34 有mysql 用户的权限
  • 对192.168.3.22 有root用户的权限
  • 对172.33.24.55 没任何权限
  • 分组管理,即可以对设置进行分组,允许用户访问某组机器,但对组里的不同机器依然有不同的访问权限    

设计表结构:

#_*_coding:utf-8_*_
__author__ = 'Alex Li'
 
from sqlalchemy import create_engine,Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,ForeignKey,UniqueConstraint
from sqlalchemy.orm import relationship
from  sqlalchemy.orm import sessionmaker
from sqlalchemy import or_,and_
from sqlalchemy import func
from sqlalchemy_utils import ChoiceType,PasswordType
 
Base = declarative_base() #生成一个SqlORM 基类
 
 
engine = create_engine("mysql+mysqldb://root@localhost:3306/test",echo=False)
 
 
BindHost2Group = Table('bindhost_2_group',Base.metadata,
    Column('bindhost_id',ForeignKey('bind_host.id'),primary_key=True),
    Column('group_id',ForeignKey('group.id'),primary_key=True),
)
 
BindHost2UserProfile = Table('bindhost_2_userprofile',Base.metadata,
    Column('bindhost_id',ForeignKey('bind_host.id'),primary_key=True),
    Column('uerprofile_id',ForeignKey('user_profile.id'),primary_key=True),
)
 
Group2UserProfile = Table('group_2_userprofile',Base.metadata,
    Column('userprofile_id',ForeignKey('user_profile.id'),primary_key=True),
    Column('group_id',ForeignKey('group.id'),primary_key=True),
)
 
 
class UserProfile(Base):
    __tablename__ = 'user_profile'
    id = Column(Integer,primary_key=True,autoincrement=True)
    username = Column(String(32),unique=True,nullable=False)
    password = Column(String(128),unique=True,nullable=False)
    groups = relationship('Group',secondary=Group2UserProfile)
    bind_hosts = relationship('BindHost',secondary=BindHost2UserProfile)
 
    def__repr__(self):
        return "<UserProfile(id='%s',username='%s')>" % (self.id,self.username)
 
class RemoteUser(Base):
    __tablename__ = 'remote_user'
    AuthTypes = [
        (u'ssh-passwd',u'SSH/Password'),
        (u'ssh-key',u'SSH/KEY'),
    ]
    id = Column(Integer,primary_key=True,autoincrement=True)
    auth_type = Column(ChoiceType(AuthTypes))
    username = Column(String(64),nullable=False)
    password = Column(String(255))
 
    __table_args__ = (UniqueConstraint('auth_type', 'username','password', name='_user_passwd_uc'),)
 
    def__repr__(self):
        return "<RemoteUser(id='%s',auth_type='%s',user='%s')>" % (self.id,self.auth_type,self.username)
 
 
class Host(Base):
    __tablename__ = 'host'
    id = Column(Integer,primary_key=True,autoincrement=True)
    hostname = Column(String(64),unique=True,nullable=False)
    ip_addr = Column(String(128),unique=True,nullable=False)
    port = Column(Integer,default=22)
    bind_hosts = relationship("BindHost")
    def__repr__(self):
        return "<Host(id='%s',hostname='%s')>" % (self.id,self.hostname)
 
class Group(Base):
    __tablename__  = 'group'
    id = Column(Integer,primary_key=True,autoincrement=True)
    name = Column(String(64),nullable=False,unique=True)
    bind_hosts = relationship("BindHost",secondary=BindHost2Group, back_populates='groups' )
    user_profiles = relationship("UserProfile",secondary=Group2UserProfile )
 
    def__repr__(self):
        return "<HostGroup(id='%s',name='%s')>" % (self.id,self.name)
 
 
class BindHost(Base):
    '''Bind host with different remote user,       eg. 192.168.1.1 mysql passAbc123       eg. 10.5.1.6    mysql pass532Dr!       eg. 10.5.1.8    mysql pass532Dr!       eg. 192.168.1.1 root    '''
    __tablename__ = 'bind_host'
    id = Column(Integer,primary_key=True,autoincrement=True)
    host_id = Column(Integer,ForeignKey('host.id'))
    remoteuser_id = Column(Integer,ForeignKey('remote_user.id'))
    host = relationship("Host")
    remoteuser = relationship("RemoteUser")
    groups = relationship("Group",secondary=BindHost2Group,back_populates='bind_hosts')
    user_profiles = relationship("UserProfile",secondary=BindHost2UserProfile)
 
    __table_args__ = (UniqueConstraint('host_id', 'remoteuser_id', name='_bindhost_and_user_uc'),)
 
    def__repr__(self):
        return "<BindHost(id='%s',name='%s',user='%s')>" % (self.id,
                                                           self.host.hostname,
                                                           self.remoteuser.username
                                                                      )
 
 
Base.metadata.create_all(engine) #创建所有表结构
 
if __name__ == '__main__':
    SessionCls = sessionmaker(bind=engine) #创建与数据库的会话session class ,注意,这里返回给session的是个class,不是实例
    session = SessionCls()
    #h1 = session.query(Host).filter(Host.hostname=='ubuntu4').first()
    #hg1 = session.query(HostGroup).filter(HostGroup.name=='t2').first()
 
    #h2 = Host(hostname='ubuntu4',ip_addr='192.168.1.21')
    #h3 = Host(hostname='ubuntu5',ip_addr='192.168.1.24',port=20000)
    #hg= HostGroup(name='TestServers3',host_id=h3.id)
    #hg2= HostGroup(name='TestServers2',host_id=h2.id)
    #hg3= HostGroup(name='TestServers3')
    #hg4= HostGroup(name='TestServers4')
    #session.add_all([hg3,hg4])
    #h2.host_groups = [HostGroup(name="t1"),HostGroup(name="t2")]
    #h3.host_groups = [HostGroup(name="t2")]
    #h1.host_groups.append(HostGroup(name="t3") )
    #print(h1.host_groups)
    #print("hg1:",hg1.host.hostname)
    #join_res = session.query(Host).join(Host.host_groups).filter(HostGroup.name=='t1').group_by("Host").all()
    #print('join select:',join_res)
    #group_by_res = session.query(HostGroup, func.count(HostGroup.name )).group_by(HostGroup.name).all()
    #print("-------------group by res-----")
 
    '''    h1=Host(hostname='h1',ip_addr='1.1.1.1')    h2=Host(hostname='h2',ip_addr='1.1.1.2')    h3=Host(hostname='h3',ip_addr='1.1.1.3')    r1=RemoteUser(auth_type=u'ssh-passwd',username='alex',password='abc123')    r2=RemoteUser(auth_type=u'ssh-key',username='alex')    g1 = Group(name='g1')    g2 = Group(name='g2')    g3 = Group(name='g3')    session.add_all([h1,h2,h3,r1,r2])    session.add_all([g1,g2,g3])    b1 = BindHost(host_id=1,remoteuser_id=1)    b2 = BindHost(host_id=1,remoteuser_id=2)    b3 = BindHost(host_id=2,remoteuser_id=2)    b4 = BindHost(host_id=3,remoteuser_id=2)    session.add_all((b1,b2,b3,b4))    all_groups = session.query(Group).filter().all() #first()    all_bindhosts = session.query(BindHost).filter().all()    #h1 = session.query(BindHost).filter(BindHost.host_id==1).first()    #h1.groups.append(all_groups[1])    #print("h1:",h1)    #print("----------->",all_groups.name,all_groups.bind_hosts)    u1 = session.query(UserProfile).filter(UserProfile.id==1).first()    print('--user:',u1.bind_hosts)    print('--user:',u1.groups[0].bind_hosts)    #u1.groups = [all_groups[1] ]    #u1.bind_hosts.append(all_bindhosts[1])    #u1 = UserProfile(username='alex',password='123')    #u2 = UserProfile(username='rain',password='abc!23')    #session.add_all([u1,u2])    #b1 = BindHost()    session.commit()    #print(h2.host_groups)    '''

这里是另外一个版本,它可以用来做堡垒机的数据表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine,and_,or_,func,Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,ForeignKey,UniqueConstraint,DateTime
from  sqlalchemy.orm import sessionmaker,relationship

Base = declarative_base() #生成一个SqlORM 基类

# 服务器账号和组
# HostUser2Group = Table('hostuser_2_group',Base.metadata,
#     Column('hostuser_id',ForeignKey('host_user.id'),primary_key=True),
#     Column('group_id',ForeignKey('group.id'),primary_key=True),
# )

# 用户和组关系表,用户可以属于多个组,一个组可以有多个人
UserProfile2Group = Table('userprofile_2_group',Base.metadata,
    Column('userprofile_id',ForeignKey('user_profile.id'),primary_key=True),
    Column('group_id',ForeignKey('group.id'),primary_key=True),
)

# 程序登陆用户和服务器账户,一个人可以有多个服务器账号,一个服务器账号可以给多个人用
UserProfile2HostUser= Table('userprofile_2_hostuser',Base.metadata,
    Column('userprofile_id',ForeignKey('user_profile.id'),primary_key=True),
    Column('hostuser_id',ForeignKey('host_user.id'),primary_key=True),
)


class Host(Base):
    __tablename__='host'
    id = Column(Integer,primary_key=True,autoincrement=True)
    hostname = Column(String(64),unique=True,nullable=False)
    ip_addr = Column(String(128),unique=True,nullable=False)
    port = Column(Integer,default=22)
    def__repr__(self):
        return  "<id=%s,hostname=%s, ip_addr=%s>" %(self.id,
                                                    self.hostname,
                                                    self.ip_addr)


class HostUser(Base):
    __tablename__ = 'host_user'
    id = Column(Integer,primary_key=True)
    AuthTypes = [
        (u'ssh-passwd',u'SSH/Password'),
        (u'ssh-key',u'SSH/KEY'),
    ]
    # auth_type = Column(ChoiceType(AuthTypes))
    auth_type = Column(String(64))
    username = Column(String(64),unique=True,nullable=False)
    password = Column(String(255))

    host_id = Column(Integer,ForeignKey('host.id'))
    
    # groups = relationship('Group',
    #                       secondary=HostUser2Group,
    #                       backref='host_list')

    __table_args__ = (UniqueConstraint('host_id','username', name='_host_username_uc'),)

    def__repr__(self):
        return  "<id=%s,name=%s>" %(self.id,self.username)


class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer,primary_key=True)
    name = Column(String(64),unique=True,nullable=False)
    def__repr__(self):
        return  "<id=%s,name=%s>" %(self.id,self.name)


class UserProfile(Base):
    __tablename__ = 'user_profile'
    id = Column(Integer,primary_key=True)
    username = Column(String(64),unique=True,nullable=False)
    password = Column(String(255),nullable=False)
    # host_list = relationship('HostUser',
    #                       secondary=UserProfile2HostUser,
    #                       backref='userprofiles')
    # groups = relationship('Group',
    #                       secondary=UserProfile2Group,
    #                       backref='userprofiles')
    def__repr__(self):
        return  "<id=%s,name=%s>" %(self.id,self.username)


class AuditLog(Base):
    __tablename__ = 'audit_log'
    id = Column(Integer,primary_key=True)
    userprofile_id = Column(Integer,ForeignKey('user_profile.id'))
    hostuser_id = Column(Integer,ForeignKey('host_user.id'))
    action_choices2 = [
        (u'cmd',u'CMD'),
        (u'login',u'Login'),
        (u'logout',u'Logout'),
    ]
    action_type = Column(ChoiceType(action_choices2))
    #action_type = Column(String(64))
    cmd = Column(String(255))
    date = Column(DateTime)

    # user_profile = relationship("UserProfile")
    #bind_host = relationship("BindHost")


engine = create_engine("mysql+pymsql://root:123@localhost:3306/stupid_jumpserver",echo=False)
Base.metadata.create_all(engine) #创建所有表结构   

 

转自:http://www.cnblogs.com/feixuelove1009/p/5693899.html

分享到:
评论

相关推荐

    基于 Spring Boot rabbitmq redis mysql 的电商秒杀系统.zip

    基于 Spring Boot rabbitmq redis mysql 的电商秒杀系统.zip

    基于SpringBoot+RabbitMQ+Redis开发的秒杀系统源码+数据库.zip

    基于SpringBoot+RabbitMQ+Redis开发的秒杀系统源码+数据库.zip 下载即用无需修改。 基于SpringBoot+RabbitMQ+Redis开发的秒杀系统源码+数据库.zip 下载即用无需修改。基于SpringBoot+RabbitMQ+Redis开发的秒杀...

    基于python实现操作redis及消息队列

    本文将详细介绍如何使用Python的redis库来操作Redis以及处理消息队列。 首先,我们需要导入`redis`库,然后创建一个连接池`redisPool`,通过指定Redis服务器的IP地址、端口和数据库编号。例如: ```python import ...

    Python操作rabbitMQ的示例代码

    Python操作RabbitMQ是将Python编程语言与RabbitMQ消息中间件相结合,以实现应用程序间的高效、可靠的消息通信。RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)的开源消息代理,它由Erlang语言开发,...

    SpringBoot+redis+RabbitMq整合实例

    这两个模板是操作Redis和RabbitMQ的主要工具。 5. 实现业务逻辑:在需要使用Redis的地方,可以直接注入RedisTemplate进行数据操作。对于RabbitMQ,可以定义消息监听器或者发送消息的方法,根据业务需求进行消息的...

    基于 Canal 的 MySql RabbitMQ Redis/memcached/mongodb

    6.保证对应关系的简单性:一个mysql表对应一个 redis实例(redis单线程,多实例保证分流不阻塞),关联关系数据交给接口业务 数据:mysql-&gt;binlog-&gt;MQ-&gt;redis(不过期、关闭RDB、AOF保证读写性能) (nosql数据仅用...

    基于rabbitmq+redis实现分布式事务项目源码.zip

    基于rabbitmq+redis实现分布式事务项目源码.zip 基于rabbitmq+redis实现分布式事务项目源码.zip 基于rabbitmq+redis实现分布式事务项目源码.zip 基于rabbitmq+redis实现分布式事务项目源码.zip 基于rabbitmq+redis...

    python+redis+rabbitmq搭建一个简单的秒杀系统

    在构建一个基于Python的秒杀系统时,我们通常会结合使用Redis和RabbitMQ来处理高并发场景下的数据存储和消息传递。以下是对这个系统的详细解释: **Redis** 是一个高性能的键值数据库,常被用作缓存和数据结构...

    基于python+redis+rabbitmq搭建一个简单的秒杀系统

    用python+redis+rabbitmq搭建一个简单的秒杀系统。由于是从项目中抽离出来的一个部分,所以省略了其他业务功能接口,只包含秒杀系统相关程序。 ## 基础流程、架构 ### 基础预设 0. mysql中存储商品信息、订单...

    如何通过Python实现RabbitMQ延迟队列

    因为系统本身一直在用RabbitMQ做异步处理任务的中间件,所以想到是否可以利用RabbitMQ实现延迟队列。功夫不负有心人,RabbitMQ虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1、Time To Live...

    基于SpringBoot的Rabbitmq和Redis调用示例

    本示例将聚焦于如何在SpringBoot项目中集成并使用RabbitMQ和Redis,这两种都是常用的中间件,分别用于消息队列和数据缓存。 RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)的消息代理和队列服务器,它...

    python rabbitmq

    ### Python与RabbitMQ知识点详解 #### 一、引言 在现代软件开发尤其是分布式系统设计中,消息队列作为实现服务间异步通信的重要工具之一,被广泛应用。其中,RabbitMQ是一款非常流行的消息中间件,支持多种消息...

    rabbitmq-nagios-plugins:基于python的RabbitMq插件集

    rabbitmq-nagios-插件一组基于python的Rabbitmq插件。 安装: easy_install pynagios 将这些脚本复制到$ NAGIOS_HOME / libexec / 配置您的nagios系统。 注意,这些脚本需要HOSTNAME而不是HOSTADDRESS来区分Rabbit...

    flask-rabbitmq:与RabbitMQ pika库结合的简单Python Flask

    flask-rabbitmq是一个框架,简化了python操作Rabbitmq的框架,可以与Flask很好地结合。 因此,您无需考虑基础操作 安装 该项目已提交给Pypi,可通过pip安装: $ pip install flask-rabbitmq 产品特点 开始关注Flask...

    秒杀系统(SpringBoot + Redis + RabbitMq)

    业务和异常因为时间关系就直接写在了controller,根据需要修改位置,另外RabbitMq的异步处理和多线程业务,看需要也可以开启。秒杀系统(SpringBoot + Redis + RabbitMq)

    亲测可用基于 SpringBoot+Maven+Mybatis+Redis+RabbitMQ 高并发秒杀系统

    基于 SpringBoot+Maven+Mybatis+Redis+RabbitMQ 高并发商城秒杀系统; 开发工具IntelliJ IDEA 2017.3.1 x64; 项目搭建: 1、下载代码 将项目加载到IDEA里面 2、运行sql文件夹下的sql文件 3、到src/main/resources下...

Global site tag (gtag.js) - Google Analytics