您当前的位置:首页 > 计算机 > 编程开发 > Python

PostgreSQL和Excel的数据合并

时间:04-22来源:作者:点击数:

用于将Excel中的数据和PG数据库中的数据进行联合处理,涉及:

  • 数据库访问
  • Excel文件访问
  • 字典和列表的使用
  • 文件读写
  • 异常处理

Python3完整源码如下:

# -*- coding: utf-8 -*-

import xlrd
import xlwt
# import pandas as pd
import psycopg2
# import openpyxl

# 从数据库中读取的设施字典/类别字典/型号字典/规格字典/厂家字典/设施编码/类别编码{name: id}
g_facility_dict = {}
g_category_dict = {}
g_provider_dict = {}
g_model_dict = {}
g_specs_dict = {}
g_level_dict = {}
g_facility_code_dict = {}
g_category_code_dict = {}
# 从数据库中读取的设备列表
g_device_asset_in_db = []
# 从XML文件中读取的设备列表
g_device_asset_in_xml = []
# 需要操作的SQL列表
g_device_to_delete = []
g_device_to_insert = []
g_device_to_update = []
g_device_error = []
# 缓存当前处理的设施
g_cache_facility_name = ""
g_cache_device_order_in_facility = 1
# 缓存最大ID号
g_max_dev_id = 0


# 源表和目标表
G_TABLE_SOURCE = "amp_device_asset"
G_TABLE_TARGET = "amp_device_asset"


# 清理缓存
def make_clean():
    global g_device_asset_in_xml
    global g_device_to_delete
    global g_device_to_insert
    global g_device_to_update
    global g_device_error
    global g_cache_facility_name
    global g_cache_device_order_in_facility

    g_device_asset_in_xml = []
    g_device_to_delete = []
    g_device_to_insert = []
    g_device_to_update = []
    g_device_error = []
    g_cache_facility_name = ""
    g_cache_device_order_in_facility = 1


# 连接数据库
def connect_db():
    t_connection = psycopg2.connect(database="my_dbname",
                                    user="my_username", password="my_passwd",
                                    host="my_ipaddr", port="5432")
    t_cursor = t_connection.cursor()
    return t_connection, t_cursor


# 关闭数据库连接
def disconnect_db(connection, cursor):
    # 提交事务
    connection.commit()
    # 关闭连接
    cursor.close()
    connection.close()


# 读取数据库中的所有设备台账数据和相关字典
def read_from_db(cursor):
    # 设施字典
    cursor.execute("SELECT id,name FROM amp_facility_asset;")
    # 获取数据,生成字典
    global g_facility_dict
    for item in cursor.fetchall():
        g_facility_dict[item[1]] = item[0]

    # 类别字典
    cursor.execute("SELECT id,name FROM amp_category_define;")
    # 获取数据,生成字典
    global g_category_dict
    for item in cursor.fetchall():
        g_category_dict[item[1]] = item[0]

    # 厂家字典
    cursor.execute("SELECT id,name FROM amp_service_provider;")
    # 获取数据,生成字典
    global g_provider_dict
    for item in cursor.fetchall():
        g_provider_dict[item[1]] = item[0]

    # 型号字典
    cursor.execute("SELECT a.value, a.display_value from sys_dict_item a "
                   "left join sys_dict_def b on a.dict_id = b.id "
                   "where b.name = 'EQ_MODEL' order by a.value;")
    # 获取数据,生成字典
    global g_model_dict
    for item in cursor.fetchall():
        g_model_dict[item[1]] = item[0]

    # 规格字典
    cursor.execute("SELECT a.value, a.display_value from sys_dict_item a "
                   "left join sys_dict_def b on a.dict_id = b.id "
                   "where b.name = 'EQ_SPECS' order by a.value;")
    # 获取数据,生成字典
    global g_specs_dict
    for item in cursor.fetchall():
        g_specs_dict[item[1]] = item[0]

    # 评级字典
    cursor.execute("SELECT a.value, a.display_value from sys_dict_item a "
                   "left join sys_dict_def b on a.dict_id = b.id "
                   "where b.name = 'EQ_LEVEL' order by a.value;")
    # 获取数据,生成字典
    global g_level_dict
    for item in cursor.fetchall():
        g_level_dict[item[1]] = item[0]

    # 设施编码字典
    cursor.execute("select substr(di.code,1,1), f.name from amp_facility_asset f "
                   "left join sys_dict_item di on f.facility_type = di.value "
                   "left join sys_dict_def dd on dd.id = di.dict_id "
                   "where f.facility_type in (1,2,3,4,5) and dd.description = '设施分类';")
    # 获取数据,生成字典
    global g_facility_code_dict
    for item in cursor.fetchall():
        g_facility_code_dict[item[1]] = item[0]

    # 类别编码字典
    cursor.execute("select category_code,name from amp_category_define where category_type = 240001 and pid > 0;")
    # 获取数据,生成字典
    global g_category_code_dict
    for item in cursor.fetchall():
        g_category_code_dict[item[1]] = item[0]

    # 执行SQL语句
    cursor.execute(f"SELECT a.id,a.name as name,f.name as f_name FROM {G_TABLE_SOURCE} a "
                   "left join amp_facility_asset f on a.belong_facility = f.id "
                   "order by f.name, a.name, a.id;")
    # 获取数据
    global g_device_asset_in_db
    for item in cursor.fetchall():
        g_device_asset_in_db.append({"id": item[0],
                                     "name": item[1],
                                     "f_name": item[2]})


# 获取同一设施下的设备数量
def get_dev_cnt_under_facility(cursor, f_name):
    cursor.execute(f"SELECT count(*) FROM amp_device_asset where belong_facility = (select id from "
                   f"amp_facility_asset where name = '{f_name}');")
    for item in cursor.fetchall():
        return item[0]


# 读取XML文件中的所有设备台账数据
def read_xml_file(file_name, flag, fill=False):
    handle = xlrd.open_workbook(file_name)
    table = handle.sheet_by_index(0)

    print(f"文件名:{file_name}")
    print(f"总行数:{str(table.nrows)}")
    print(f"总列数:{str(table.ncols)}")

    facility_name = table.col_values(0)
    facility_code = table.col_values(1)
    col_2 = table.col_values(2)
    device_name = table.col_values(3)
    category_name = table.col_values(4)
    device_code = table.col_values(5)
    dev_level = table.col_values(6)
    dispatch_no = table.col_values(7)
    specs_name = table.col_values(8)
    model_name = table.col_values(9)
    provider_name = table.col_values(10)
    build_date = table.col_values(11)
    col_12 = table.col_values(12)
    col_13 = table.col_values(13)
    col_14 = table.col_values(14)
    col_15 = table.col_values(15)
    col_16 = table.col_values(16)
    col_17 = table.col_values(17)
    power_scope = ["" for x in range(len(facility_name))]
    if not fill:
        power_scope = table.col_values(18)
    col_19 = table.col_values(19)
    col_20 = table.col_values(20)

    list_dict = []
    for i in range(table.nrows):
        if device_name[i] is None or len(str(device_name[i]).strip()) == 0:
            if len(str(category_name[i])) > 0:
                device_name[i] = category_name[i]
            else:
                print(f"{file_name}  L: {i}")
        list_dict.append({"facility_name": str(facility_name[i]).strip().replace("\n", "")
                         .replace("KV", "kV").replace("kv", "kV").replace("Kv", "kV"),
                          "facility_code": str(facility_code[i]).strip().replace("\n", ""),
                          "col_2": str(col_2[i]).strip().replace("\n", ""),
                          "device_name": str(device_name[i]).strip().replace("\n", ""),
                          "category_name": str(category_name[i]).strip().replace("\n", "")
                         .replace("主变压器", "变压器").replace("GIS设备", "GIS"),
                          "device_code": str(device_code[i]).strip().replace("\n", ""),
                          "dev_level": str(dev_level[i]).strip().replace("\n", ""),
                          "dispatch_no": str(dispatch_no[i]).strip().replace("\n", "").replace(".0", ""),
                          "specs_name": str(specs_name[i]).strip().replace("\n", ""),
                          "model_name": str(model_name[i]).strip().replace("\n", ""),
                          "provider_name": str(provider_name[i]).strip().replace("\n", ""),
                          "build_date": str(build_date[i]).strip().replace("\n", ""),
                          "col_12": str(col_12[i]).strip().replace("\n", ""),
                          "col_13": str(col_13[i]).strip().replace("\n", ""),
                          "col_14": str(col_14[i]).strip().replace("\n", ""),
                          "col_15": str(col_15[i]).strip().replace("\n", ""),
                          "col_16": str(col_16[i]).strip().replace("\n", ""),
                          "col_17": str(col_17[i]).strip().replace("\n", ""),
                          "power_scope": str(power_scope[i]).strip().replace("\n", "")
                         .replace("KV", "kV").replace("kv", "kV").replace("Kv", "kV"),
                          "col_19": str(col_19[i]).strip().replace("\n", ""),
                          "col_20": str(col_20[i]).strip().replace("\n", ""),
                          "flag": flag})
    return list_dict


# 从字典中获取值
def fetch_from_dict(item, key_name, value_type, dict_name=None):
    current_item_name = key_name
    current_item_str = item[key_name]
    try:
        if value_type == "str":
            return current_item_str
        elif value_type == "num":
            return dict_name[current_item_str] if (current_item_str in dict_name) else "null"
        else:
            return "null"
    except KeyError as kerr:
        print("[Error] when makeup sql: err=%s\n file=%s\t item_name=%s\t item_str=%s"
              % (kerr, item['flag'], current_item_name, current_item_str))
        item["col_20"] = f"{key_name} 在数据库中未找到定义"
        g_device_error.append(item)
        raise Exception("[Error] : item_name=%s\t item_str=%s" % (current_item_name, current_item_str))


# 生成SN码
def generate_sn(cursor, item):
    var_category = fetch_from_dict(item, "category_name", "num", g_category_code_dict)
    var_facility = fetch_from_dict(item, "facility_name", "num", g_facility_code_dict)
    global g_cache_facility_name
    global g_cache_device_order_in_facility
    current_facility_name = item["facility_name"]
    if current_facility_name != g_cache_facility_name:
        g_cache_facility_name = current_facility_name
        g_cache_device_order_in_facility = get_dev_cnt_under_facility(cursor, current_facility_name) + 1
    else:
        g_cache_device_order_in_facility += 1
    sn = f"电-{var_category}-{g_cache_device_order_in_facility}-{item['flag']}{var_facility}01"
    if sn.find("null") >= 0:
        print("[Error] when makeup sn=%s,\t 文件=%s\t 设施名称=%s\t 设备名称=%s"
              % (sn, item['flag'], item['facility_name'], item['device_name']))
        if var_category is None or var_category == 'null':
            item["col_20"] = f"字段 '设备类别' 在数据库中未找到定义"
        if var_facility is None or var_facility == 'null':
            item["col_20"] = f"字段 '设施名称' 在数据库中未找到定义"
        g_device_error.append(item)
        raise Exception("[Error] when makeup sn: err=%s,\t file=%s\t facility=%s\t device=%s"
                        % (sn, item['flag'], item['facility_name'], item['device_name']))
    else:
        return sn


# 将数据库中的数据和XML文件中的数据进行逐条处理
def process_data(cursor):
    # 根据设施名称和设备名称,将DB数据中的device.id对应写入到XML数据中
    global g_max_dev_id
    for dev_in_xml in g_device_asset_in_xml:
        if dev_in_xml["facility_name"] == "设施名称":
            continue
        for dev_in_db in g_device_asset_in_db:
            if not dev_in_xml["device_name"] is None and not dev_in_db["name"] is None \
                    and dev_in_xml["device_name"] == dev_in_db["name"] \
                    and not dev_in_xml["facility_name"] is None and not dev_in_db["f_name"] is None \
                    and dev_in_xml["facility_name"] == dev_in_db["f_name"]:
                dev_in_xml["id"] = dev_in_db["id"]
                g_device_asset_in_db.remove(dev_in_db)
                if dev_in_db["id"] > g_max_dev_id:
                    g_max_dev_id = dev_in_db["id"]
                print("=======break:01")
                break

    # SQL语句
    device_sql_list = []
    # 数据库中有,且XML文件中没有的设备ID,存入列表g_device_asset_in_db_only中
    for dev_in_db in g_device_asset_in_db:
        sql_str = f"delete from {G_TABLE_TARGET} where id = {dev_in_db['id']};"
        g_device_to_delete.append(sql_str)
        device_sql_list.extend(g_device_to_delete)

    # XML文件中有,且数据库中没有的设备(ID属性为空),为其分配ID
    global g_device_error
    for dev_in_xml in g_device_asset_in_xml:
        if dev_in_xml["facility_name"] == "设施名称":
            continue
        """
        if 'id' in dev_in_xml:
            print("=======pass")
            pass
            sql_str = f"update {G_TABLE_TARGET} set " \
                      f"dispatch_no = '{dev_in_xml['dispatch_no']}', " \
                      f"power_scope_text='{dev_in_xml['power_scope']}' " \
                      f"where id = {dev_in_xml['id']}; "
            g_device_to_update.append(sql_str)
            device_sql_list.extend(g_device_to_update)
        else:
        """
        g_max_dev_id += 1
        dev_in_xml["id"] = g_max_dev_id
        try:
            var_device = fetch_from_dict(dev_in_xml, "device_name", "str")
            var_qr = fetch_from_dict(dev_in_xml, "device_code", "str")
            var_facility = fetch_from_dict(dev_in_xml, "facility_name", "num", g_facility_dict)
            var_provider = fetch_from_dict(dev_in_xml, "provider_name", "num", g_provider_dict)
            var_category = fetch_from_dict(dev_in_xml, "category_name", "num", g_category_dict)
            var_model = fetch_from_dict(dev_in_xml, "model_name", "num", g_model_dict)
            var_specs = fetch_from_dict(dev_in_xml, "specs_name", "num", g_specs_dict)
            var_dispatch = fetch_from_dict(dev_in_xml, "dispatch_no", "str")
            var_power = fetch_from_dict(dev_in_xml, "power_scope", "str")
            var_level = fetch_from_dict(dev_in_xml, "dev_level", "num", g_level_dict)
            var_build = fetch_from_dict(dev_in_xml, "build_date", "str")
            var_sn = generate_sn(cursor, dev_in_xml)

            sql_str = f"insert into {G_TABLE_TARGET}(id, name, belong_facility, sn, mng_user_id, live_state, " \
                      f"model, specs, category, manufacture, dispatch_no, power_scope_text, "\
                      f"level, build_date, qr_code) values" \
                      f"({g_max_dev_id},'{var_device}',{var_facility},'{var_sn}@{dev_in_xml['facility_code']}',1,1," \
                      f"{var_model},{var_specs},{var_category},{var_provider},'{var_dispatch}','{var_power}',"\
                      f"{var_level},'{var_build}','{var_qr}');"
        except Exception as err:
            print(err)
            continue
        g_device_to_insert.append(sql_str)
        device_sql_list.extend(g_device_to_insert)
    return device_sql_list


# 写文本文件
def write_sql_file(file_name, str_list):
    try:
        with open(file_name, 'w', encoding='utf-8') as sql_file:
            for sql_str in str_list:
                sql_file.write(str(sql_str) + "\n")
    except IOError as err:
        print(f'[Error] when write sql to file: {err}')


# 写Excel文件
def write_xml_file(file_name, str_list=[]):
    handle = xlwt.Workbook()
    sheet = handle.add_sheet('error_data')

    title = ['设施名称', '设施编码', '设施类别', '设备名称', '设备类别', '设备编号', '设备评级', '调度号', '设备规格', '设备型号',
             '制造厂家', '投运日期', '出厂编号', '资产所属', '电源接引', '接入方式', '供电方式', '设备分类', '供电范围', '主要参数', '备注']
    for i in range(len(title)):
        sheet.write(0, i, title[i])

    for i in range(len(str_list)):
        dict_cell = list(str_list[i].values())
        for j in range(len(dict_cell)):
            if j == len(dict_cell)-2:
                break
            sheet.write(i+1, j, dict_cell[j])
    handle.save(file_name)


"""
# 将表格写入Excel
def export_excel(export):
    # 将字典列表转换为DataFrame
    pf = pd.DataFrame(list(export))
    # 指定字段顺序
    order = ['file', 'facility_name', 'device_name', 'sn']
    pf = pf[order]
    # 将列名替换为中文
    columns_map = {
        'file': '文件',
        'facility_name': '设施名称',
        'device_name': '设备名称',
        'sn': 'sn'
    }
    pf.rename(columns=columns_map, inplace=True)
    # 指定生成的Excel表格名称
    file_path = pd.ExcelWriter('error.xlsx')
    # 替换空单元格
    pf.fillna(' ', inplace=True)
    # 输出
    pf.to_excel(file_path, encoding='utf-8', index=False)
    # 保存表格
    file_path.save()
"""


def print_hi(name):
    # Use a breakpoint in the code line below to debug your script.
    print(f'Hi, {name}')  # Press Ctrl+F8 to toggle the breakpoint.


# 处理一个文件
def process_file(file_name, cursor, flag):
    # 从XML文件中读取设备台账信息:name,f_name,dispatch_no,power_scope
    make_clean()
    global g_device_asset_in_xml
    g_device_asset_in_xml = read_xml_file(file_name, flag)
    g_device_asset_in_xml = sorted(g_device_asset_in_xml, key=lambda r: r['facility_name'])
    process_data(cursor)
    write_sql_file(f"output/sql/{flag}-delete.sql", g_device_to_delete)
    write_sql_file(f"output/sql/{flag}-insert.sql", g_device_to_insert)
    write_sql_file(f"output/sql/{flag}-update.sql", g_device_to_update)
    write_xml_file(f"output/errors/{flag}-errors.xls", g_device_error)


# Press the green button in the gutter to run the script.
if __name__ == '__main__':
    # print_hi('PyCharm')

    # 从数据库中读取设备台账信息:id,name,f_name
    g_conn, g_curs = connect_db()
    read_from_db(g_curs)
    print(f"find {len(g_device_asset_in_db)} device in db")

    global g_max_dev_id
    """
    # 第一批用此代码
    g_max_dev_id = 0
    process_file('input/S-P1.xls', g_curs, 'S')
    process_file('input/N-P1.xls', g_curs, 'N')
    process_file('input/D-P1.xls', g_curs, 'D')
    process_file('input/L-P1.xls', g_curs, 'L')
    """
    # 第二批用此代码
    g_max_dev_id = 9404
    process_file('input/feedback/S-P2.xls', g_curs, 'S')
    process_file('input/feedback/N-P2.xls', g_curs, 'N')
    process_file('input/feedback/D-P2.xls', g_curs, 'D')
    process_file('input/feedback/L-P2', g_curs, 'L')

    disconnect_db(g_conn, g_curs)

# See PyCharm help at https://www.jetbrains.com/help/pycharm/

方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门