`

ruby调用python实现大数据量导出的案例

阅读更多
先看看一般情况下ruby导出csv的代码案例:
respond_to do |format|
      format.html # index.html.erb
      format.xml  { render :xml => @mobile_units}
      format.csv {
        columns = Ultra::MobileUnit.export_columns_name
        datas = UI::Grid.new(Ultra::MobileUnit, @mobile_units).to_csv
(columns) do |col, data|
            data[col]
        end
        send_data(datas,:type=>'text/csv;header=persent',:filename => 'wifi_mobile_units.csv')
      }
    end
    end


由于数据量巨大,需要考虑到将导出任务转移到后台去执行,经过查找发现了python中有一个很好用的Celery异步执行工具。
def export_mobile
    #params[:param_values]为一个执行sql脚本
    param_values=params[:param_values]
    @celery_task = CeleryTask.new({:name => "report_#{Time.now.to_i}.csv", :task_type => "mobile_export",:result => "30", :user_id => current_user.id,:params => param_values})
    @celery_task.save!()
    #生成一条Celery任务记录到数据库表
    task_file = File.join(RAILS_ROOT,"python_tasks","ultra_export_main.py")
    #system服务器掉调用python脚本执行请求。
    system "python #{task_file} #{@celery_task.id}"
    @celery_id = @celery_task.id
    respond_to do |format|
      format.html { render :layout => 'simple'}
    end
  end



#python接收
# encoding: utf-8
import sys
from models import CeleryTask,session
from export_oracle import export_excel_oracle
from export import export_excel
from import_site import import_xls
from amend import amend_xls
from export_xls import export_amend_data
from export_alarm import export_history_alarms

if __name__ == "__main__":
    id = sys.argv[1]
    task = session.query(CeleryTask).get(id)
    export_type = task.task_type
    if export_type in ["export_perf_report", "export_alarm_report", "export_config_report"]:
        print(">>> export report")
        res = export_excel_oracle.delay(id)
    elif export_type in ["ap_query_export", "site_query_export", "alarms_export"]:
        print(">>export")
        res = export_excel.delay(id)
    # 更新 celery_tasks记录的task_id
    task.task_id = res.task_id
    session.commit()
    


models模块定义:
#接收参数,生成cession同事插入cerery表中做task_id。
# encoding: utf-8
from celeryconfig import CELERY_RESULT_DBURI
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Text
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

engine = create_engine(CELERY_RESULT_DBURI+"?charset=utf8", pool_recycle=600)
Session = sessionmaker()
Session.configure(bind=engine)
session = Session()

Base = declarative_base()
class CeleryTask(Base):
    __tablename__ = 'celery_tasks'
    id = Column(Integer, primary_key=True)
    name = Column(String(255))      # 任务名称
    task_id = Column(String(255))   # 关联celery_taskmeta表中的task_id
    task_type = Column(String(50))  # 任务类型
    user_id = Column(Integer)       # 操作用户
    excel_file = Column(String(255)) # 导入的文件名称,或导出的文件名称
    result = Column(String(255))    # 任务结果
    error_file = Column(String(255)) # 导入错误数据时,生成的错误数据文件名称
    params = Column(Text)           # 调用存储过程的参数字符串


mysql导出案例:

import sys,os
from celery.task import task
from models import session,CeleryTask
import MySQLdb
from celeryconfig import MYSQL_CHARSET,MYSQL_DB,MYSQL_HOST,MYSQL_PASSWD,MYSQL_PORT,MYSQL_USER
from export_help import get_columns
from export_config import ap_columns, ap_columns_dict, ap_state, fit_ap, indoor, managed, site_columns, site_columns_dict, is_checked, pro_status, alarm_columns,alarm_columns_dict
import csv
import codecs

@task(time_limit=1800, soft_timeout_limit=1200)
def export_excel(task_id):
    if sys.getdefaultencoding() != 'utf-8':
        reload(sys)
        sys.setdefaultencoding('utf-8')
    task = session.query(CeleryTask).get(task_id)
    task.result = "35"
    session.commit()

    conn = MySQLdb.connect(host=MYSQL_HOST, user=MYSQL_USER, passwd=MYSQL_PASSWD, db=MYSQL_DB, charset=MYSQL_CHARSET,port=MYSQL_PORT)
    cursor = conn.cursor(cursorclass = MySQLdb.cursors.DictCursor)
    export_type = task.task_type
    if export_type == "site_query_export":
        cursor.execute("drop table if exists port_ap_temp")
        cursor.execute('''create temporary table if not exists port_ap_temp
        (port int not null,ap_num int,fit_ap_num int,fat_ap_num int,on_line int,off_line int,
        index port_ap_temp_port (port))''')
        cursor.execute('''insert into port_ap_temp
        (select port,count(id) ap_num,count(case ap_fit when 1 then id else null end) fit_ap_num,
        count(case ap_fit when 1 then null else id end) fat_ap_num,
        count(case when ap_fit = 1 and managed_state = 1 then id else null end) on_line,
        count(case when ap_fit = 1 and managed_state = 1 then null else id end) off_line
        from mit_aps group by port)''')
        cursor.execute("drop table if exists port_sw_temp")
        #生成一张临时表来处理数据
        cursor.execute('''create temporary table if not exists port_sw_temp
        (port int not null,sw_num int,index port_sw_temp_port (port))''')
        cursor.execute("insert into port_sw_temp (select port, count(*) sw_num from mit_switchs group by port)")
    cursor.execute(task.params)
    results = cursor.fetchall()
    task.result = "50"
    session.commit()

    export_excel = export_type+u'_'+str(task.id)+".csv"
    root_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),'..')
    export_path = os.path.join(root_path,'public','export')
    if not os.path.exists(export_path):
        os.mkdir(export_path)
    csv_file_dest = os.path.join(export_path,export_excel)

    if export_type == "ap_query_export":
        write_aps(results,csv_file_dest)
    elif export_type == "site_query_export":
        code_data = get_dict_data(cursor)
        write_sites(results,csv_file_dest,code_data)
    task.error_file = export_excel
    task.result = "100"
    session.commit()
    cursor.close()
    conn.close()

def write_aps(results,csv_file_dest):
    outputFile = codecs.open(csv_file_dest,'w', 'gbk') # 'wb'
    output = csv.writer(outputFile, dialect='excel')
    ap_cols,ap_cols_dict = get_columns("ap")
    export_ap_columns = ap_columns + ap_cols
    ap_columns_dict.update(ap_cols_dict)
    headers = []
    for col in export_ap_columns:
        headers.append(ap_columns_dict[col])
    output.writerow(headers)
    for result in results:
        r = []
        try:
            for col, col_name in enumerate(export_ap_columns):
                if col_name == "transfer_type":
                    r.append(result.get("transfer_type_name"," "))
                elif col_name == "power_type":
                    r.append(result.get("power_type_name"," "))
                elif col_name == "ap_state":
                    r.append(ap_state.get(result.get(col_name," ")))
                elif col_name == "managed_state":
                    r.append(managed.get(result.get(col_name," ")))
                elif col_name == "is_indoor":
                    r.append(indoor.get(result.get(col_name," ")))
                else:
                    r.append(result.get(col_name," "))
            output.writerow(r)
        except:
            print "export ap except"
    outputFile.close()


def write_sites(results,csv_file_dest,code_data):
    outputFile = codecs.open(csv_file_dest,'w', 'gbk') # 'wb'
    output = csv.writer(outputFile, dialect='excel')
    site_cols,site_cols_dict = get_columns("site")
    export_site_columns = site_columns + site_cols
    site_columns_dict.update(site_cols_dict)
    headers = []
    for col in export_site_columns:
        headers.append(site_columns_dict[col])
    output.writerow(headers)
    for result in results:
        r = []
        try:
            for col, col_name in enumerate(export_site_columns):
                if col_name in ["ap_type","sw_type","phase","document","net_state","site_level","transfer_type"]:
                    r.append(code_data.get(result.get(col_name," ")))
                elif col_name == "is_checked":
                    r.append(is_checked.get(result.get(col_name," ")))
                elif col_name == "project_status":
                    r.append(pro_status.get(result.get(col_name," ")))
                else:
                    r.append(result.get(col_name," "))
            output.writerow(r)
        except:
            print "export site except"

    outputFile.close()


if __name__ == "__main__":
    id = sys.argv[1]
    export_excel(id)


其中excell表头名,在引用文件中定义:
# -*- coding: utf-8 -*-
ap_columns = ["province",
              "city",
              "town",
              "hp_id",
              "port_name",
              "site_type_name",
              "ap_cn",
              "ap_level_name",
              "mac",
              "longitude",
              "address",
              "managed_state",
              "created_at"]

ap_columns_dict = {
    "province": u'省',
    "city": u'地市',
    "town": u'区县',
    "ac_ip": u"所属AC的IP地址",
    "sw_cn": u"所属Switch",
    "uplink_bandwidth": u"上联带宽(Mbps)",
    "port_z": u"上联设备端口",
    "odf_z": u"对端ODF",
    "port_a": u"本端端口",
    "odf_a": u"本端ODF",
    "integration_unit": u"集成商",
    "supply_name": u"代维厂家",
    "remark": u"备注",
    "address": u"位置",
    "managed_state": u"在线状态",
    "created_at": u"创建日期",
}




oracle调用实例:

# encoding: utf-8
import sys, os
from celery.task import task
from models import session,CeleryTask
from xlwt import Workbook,XFStyle
import cx_Oracle
from celeryconfig import ORACLE_DSN, ORACLE_USER, ORACLE_PASSWD, REPORT_EXPORT_NUM
from export_help import oracle_encode, format_data, get_format_dict
from ultra_export_config_apoffline import apoffline_columns_dict
import csv
import codecs

@task(time_limit=1800, soft_timeout_limit=1200)
def export_excel_apoffline(task_id):
    if sys.getdefaultencoding() != 'utf-8':
        reload(sys)
        sys.setdefaultencoding('utf-8')
    os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
    task = session.query(CeleryTask).get(task_id)
    task.result = "35"
    session.commit()
    conn = cx_Oracle.connect(ORACLE_USER, ORACLE_PASSWD, ORACLE_DSN)
    encoding = conn.encoding
    cursor = conn.cursor()
    #连接oracle库
    cursor.execute(task.params)
    #执行sql脚本
    results = cursor.fetchall()
    #获取返回值
    task.result = "50"
    session.commit()
    export_type = task.task_type
    export_name = task.name
    export_excel = export_name+u'_'+str(task.id)+".csv"
    root_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),'..')
    export_path = os.path.join(root_path,'public','export')
    if not os.path.exists(export_path):
        os.mkdir(export_path)
    #定义添加csv文件
    csv_file_dest = os.path.join(export_path,export_excel)
    #获取sql脚本列值序列
    colname = [tuple[0] for tuple in cursor.description]
    outputFile = codecs.open(csv_file_dest,'w', 'gbk')
    output = csv.writer(outputFile, dialect='excel')
    #写入csv表头
    headers = []
    for col in colname:
        headers.append(apoffline_columns_dict[col])
    output.writerow(headers)
    #写入查询值
    for result in results:
        r = []
        try:
            for col_name in result:
                r.append(col_name)
            output.writerow(r)
        except:
            print "export apoffline except"
    outputFile.close()
    task.error_file = export_excel
    task.result = "100"
    session.commit()
    session.close()
    cursor.close()
    conn.close()

分享到:
评论

相关推荐

    shell+ruby.zip

    这个压缩包可能是为了解决在大数据量的Redis实例中,快速查找和导出特定前缀键值的问题。 描述中提到的场景是,我们需要从海量的Redis Key中筛选出具有特定前缀的Key,并将它们的值保存到文件。如果单个文件大小...

    java开源包5

    ftp4j是一个FTP客户端Java类库,实现了FTP客户端应具有的大部分功能文件(包括上传和下 载),浏览远程FTP服务器上的目录和文件,创建、删除、重命,移动远程目录和文件。ftp4j提供多种方式连接到远程FTP服务器包括...

    gdal-1.6.1版本.rar

    - 开发GIS桌面应用,实现数据的导入导出、编辑和展示。 - 构建地图应用的后端服务,处理大量地理数据的存储和处理。 在1.6.1版本中,可能包含了该版本特有的改进和修复,例如性能优化、新格式支持或者错误修复。...

    hbase用于查询客户端工具

    通过编写MapReduce作业,可以对HBase表进行大规模的数据导入和导出,或者执行复杂的数据分析任务。 在实际使用中,选择哪个客户端工具取决于具体的需求和使用场景。例如,如果需要快速原型开发或简单的数据操作,...

    java开源包1

    ftp4j是一个FTP客户端Java类库,实现了FTP客户端应具有的大部分功能文件(包括上传和下 载),浏览远程FTP服务器上的目录和文件,创建、删除、重命,移动远程目录和文件。ftp4j提供多种方式连接到远程FTP服务器包括...

    java开源包3

    ftp4j是一个FTP客户端Java类库,实现了FTP客户端应具有的大部分功能文件(包括上传和下 载),浏览远程FTP服务器上的目录和文件,创建、删除、重命,移动远程目录和文件。ftp4j提供多种方式连接到远程FTP服务器包括...

    java开源包4

    ftp4j是一个FTP客户端Java类库,实现了FTP客户端应具有的大部分功能文件(包括上传和下 载),浏览远程FTP服务器上的目录和文件,创建、删除、重命,移动远程目录和文件。ftp4j提供多种方式连接到远程FTP服务器包括...

    前端后端和数据库代码资源

    `后台代码.zip`可能包含服务器端代码,通常由Node.js、Java、Python或Ruby等服务器端语言编写。后端开发者负责处理用户请求,验证数据,执行业务逻辑,以及与数据库进行通信。这部分代码可能包括路由、控制器、模型...

    mencached 缓存

    开发者可以方便地在代码中调用API进行数据的存取。 - **设置缓存**: 通常使用`set`命令来存储键值对,例如在PHP中,`memcache_set()`函数可用于此目的。 - **获取缓存**: 使用`get`命令获取缓存,例如PHP中的`...

    MongoDB管理与开发精要《红丸出品》

    - 随着互联网技术的发展,传统的关系型数据库在处理大规模数据时遇到了瓶颈,特别是在高并发和大数据量的场景下表现不佳。 - NoSQL数据库由于其灵活的数据模型、可扩展性以及高性能等特点,在许多领域得到了广泛应用...

    java开源包11

    ftp4j是一个FTP客户端Java类库,实现了FTP客户端应具有的大部分功能文件(包括上传和下 载),浏览远程FTP服务器上的目录和文件,创建、删除、重命,移动远程目录和文件。ftp4j提供多种方式连接到远程FTP服务器包括...

    java开源包2

    ftp4j是一个FTP客户端Java类库,实现了FTP客户端应具有的大部分功能文件(包括上传和下 载),浏览远程FTP服务器上的目录和文件,创建、删除、重命,移动远程目录和文件。ftp4j提供多种方式连接到远程FTP服务器包括...

    java开源包6

    ftp4j是一个FTP客户端Java类库,实现了FTP客户端应具有的大部分功能文件(包括上传和下 载),浏览远程FTP服务器上的目录和文件,创建、删除、重命,移动远程目录和文件。ftp4j提供多种方式连接到远程FTP服务器包括...

    java开源包10

    ftp4j是一个FTP客户端Java类库,实现了FTP客户端应具有的大部分功能文件(包括上传和下 载),浏览远程FTP服务器上的目录和文件,创建、删除、重命,移动远程目录和文件。ftp4j提供多种方式连接到远程FTP服务器包括...

    java开源包8

    ftp4j是一个FTP客户端Java类库,实现了FTP客户端应具有的大部分功能文件(包括上传和下 载),浏览远程FTP服务器上的目录和文件,创建、删除、重命,移动远程目录和文件。ftp4j提供多种方式连接到远程FTP服务器包括...

    java开源包7

    ftp4j是一个FTP客户端Java类库,实现了FTP客户端应具有的大部分功能文件(包括上传和下 载),浏览远程FTP服务器上的目录和文件,创建、删除、重命,移动远程目录和文件。ftp4j提供多种方式连接到远程FTP服务器包括...

    java开源包9

    ftp4j是一个FTP客户端Java类库,实现了FTP客户端应具有的大部分功能文件(包括上传和下 载),浏览远程FTP服务器上的目录和文件,创建、删除、重命,移动远程目录和文件。ftp4j提供多种方式连接到远程FTP服务器包括...

    java开源包101

    ftp4j是一个FTP客户端Java类库,实现了FTP客户端应具有的大部分功能文件(包括上传和下 载),浏览远程FTP服务器上的目录和文件,创建、删除、重命,移动远程目录和文件。ftp4j提供多种方式连接到远程FTP服务器包括...

    Java资源包01

    ftp4j是一个FTP客户端Java类库,实现了FTP客户端应具有的大部分功能文件(包括上传和下 载),浏览远程FTP服务器上的目录和文件,创建、删除、重命,移动远程目录和文件。ftp4j提供多种方式连接到远程FTP服务器包括...

Global site tag (gtag.js) - Google Analytics