Pandas读写MySQL数据库(支持百万条读取)

Ft 2020-07-14 AM 195℃ 0条

Pandas读写MySQL数据库

我们需要以下三个库来实现Pandas读写MySQL数据库:
pandas、sqlalchemy、pymysql
其中,

  • pandas模块提供了read_sql_query()函数实现了对数据库的查询,to_sql()函数实现了对数据库的写入。并不需要实现新建MySQL数据表。
  • sqlalchemy模块实现了与不同数据库的连接,
  • pymysql模块则使得Python能够操作MySQL数据库。

一、读取数据库
示例:

#DBAPI2 connection方式连接
import mysql.connector
import pandas as pd

conn=mysql.connector.connect(host='127.0.0.1',user='root', passwd='password', db='test')
sql='select * from mytable'
df=pd.read_sql(sql,conn)

示例:

#SQLAlchemy engine方式
from sqlalchemy import create_engine
import pandas as pd
engine = create_engine('mysql+mysqlconnector://root:password@10.39.211.198:3306/test')
df=pd.read_sql(sql,engine )

二、写入数据库
1、使用 SQL 语句来创建表结构
示例 get_schema()并不是一个公开的方法,没有文档可以查看

import sqlalchemy

print(pd.io.sql.get_schema(df, 'emp_backup', keys='EMP_ID', 
   dtype={'EMP_ID': sqlalchemy.types.BigInteger(),
       'GENDER': sqlalchemy.types.String(length=20),
       'AGE': sqlalchemy.types.BigInteger(),
       'EMAIL':  sqlalchemy.types.String(length=50),
       'PHONE_NR':  sqlalchemy.types.String(length=50),
       'EDUCATION':  sqlalchemy.types.String(length=50),
       'MARITAL_STAT':  sqlalchemy.types.String(length=50),
       'NR_OF_CHILDREN': sqlalchemy.types.BigInteger()
       }, con=engine))

2、to_sql() 方法使用 append 方式插入数据
示例 pandas写入数据到mysql只有SQLAlchemy engine方式:

#SQLAlchemy engine方式
from sqlalchemy import create_engine
import pandas as pd
engine = create_engine('mysql+mysqlconnector://root:password@10.39.211.198:3306/test')
df.to_sql('mytable',con=engine,if_exists='append',index=False)

三、补充:
engine.execute(sql)可以直接执行sql语句:

from sqlalchemy import create_engine 
  
 engine = create_engine('mysql+pymysql://root:123456@localhost:3306/test')
sql = "DROP TABLE IF EXISTS example"
engine.execute(sql)

如果用pymysql,则必须用cursor,读者可以对比一下。

import pymysql
from sqlalchemy import create_engine
 
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='test')
# engine = create_engine('mysql+pymysql://root:123456@localhost:3306/test')
sql = "DROP TABLE IF EXISTS test_input"
cursor = conn.cursor()
cursor.execute(sql)

四、函数详解
1、to_sql
函数:

DataFrame.to_sql(name, con, schema=None, if_exists='fail', index=True, index_label=None, chunksize=None, dtype=None, method=None)

参数:

  • name: 输出的表名
  • con: 与read_sql中相同
  • if_exits: 三个模式:fail,若表存在,则不输出;replace:若表存在,覆盖原来表里的数据;append:若表存在,将数据写到原表的后面。默认为fail
  • index:是否将df的index单独写到一列中
  • index_label:指定列作为df的index输出,此时index为True
  • chunksize: 同read_sql
  • dtype: 指定列的输出到数据库中的数据类型。字典形式储存:{column_name: sql_dtype}。常见的数据类型有
  • sqlalchemy.types.INTEGER(), sqlalchemy.types.NVARCHAR(),sqlalchemy.Datetime()等,具体数据类型可以参考这里

还是以写到mysql数据库为例:

df.to_sql(name='table', 
          con=con, 
          if_exists='append', 
          index=False,
          dtype={'col1':sqlalchemy.types.INTEGER(),
                 'col2':sqlalchemy.types.NVARCHAR(length=255),
                 'col_time':sqlalchemy.DateTime(),
                 'col_bool':sqlalchemy.types.Boolean
          })

注:如果不提供dtype,to_sql会自动根据df列的dtype选择默认的数据类型输出,比如字符型会以sqlalchemy.types.TEXT类型输出,相比NVARCHAR,TEXT类型的数据所占的空间更大,所以一般会指定输出为NVARCHAR;而如果df的列的类型为np.int64时,将会导致无法识别并转换成INTEGER型,需要事先转换成int类型(用map,apply函数可以方便的转换)。

2、read_sql
函数:

pandas.read_sql(sql, con, index_col=None, coerce_float=True, params=None, parse_dates=None, columns=None, chunksize=None)

效果:将SQL查询或数据库表读入DataFrame。

示例:

import pymysql
import pandas as pd
 
con = pymysql.connect(host="127.0.0.1",user="root",password="password",db="world")

data_sql=pd.read_sql("SQL查询语句",con)

data_sql.to_csv("test.csv")

参数:

  • sql : string or SQLAlchemy Selectable (select or text object);要执行的SQL查询或表名。
  • con : SQLAlchemy connectable (engine/connection) or database string URI;或DBAPI2连接(后备模式),使用SQLAlchemy可以使用该库支持的任何数据库。如果是DBAPI2对象,则仅支持sqlite3。
  • index_col : string or list of strings, optional, default: None;要设置为索引的列(MultiIndex)。
  • coerce_float : boolean, default True;尝试将非字符串,非数字对象(如decimal.Decimal)的值转换为浮点,这对SQL结果集很有用
  • params : list, tuple or dict, optional, default: None;List of parameters to pass to execute method. 【The syntax used to pass parameters is database driver dependent. Check your database driver documentation for which of the five syntax styles, described in PEP 249’s paramstyle, is supported. Eg. for psycopg2, uses %(name)s so use params={‘name’ : ‘value’}
  • parse_dates : list or dict, default: None

    • List of column names to parse as dates.要解析为日期的列名列表。
    • Dict of {column_name: format string} where format string is strftime compatible in case of parsing string times, or is one of (D, s, ns, ms, us) in case of parsing integer timestamps.在解析字符串时,格式字符串是strftime兼容的格式字符串,或者是(D、s、ns、ms、us),以防解析整型时间戳。
    • Dict of {column_name: arg dict}, where the arg dict corresponds to the keyword arguments of pandas.to_datetime() Especially useful with databases without native Datetime support, such as SQLite.
      {column_name:arg dict}的字典,其中arg dict对应于pandas.to_datetime()的关键字参数。对于没有本机Datetime支持的数据库(如SQLite)特别有用。
  • columns : list, default: None;从SQL表中选择的列名列表(仅在读取表时使用)。
  • chunksize : int, default None;如果指定,则返回一个迭代器,其中chunksize是要包含在每个块中的行数。

3、read_sql_table
函数:

pandas.read_sql_table(table_name, con, schema=None, index_col=None, coerce_float=True, parse_dates=None, columns=None, chunksize=None)[source]`

效果:将SQL数据库表读入DataFrame。给定一个表名和一个SQLAlchemy可连接,返回一个DataFrame。此功能不支持DBAPI连接。

示例:

import pandas as pd
import pymysql
from sqlalchemy import create_engine
 
con = create_engine('mysql+pymysql://user_name:password@127.0.0.1:3306/database_name')
data = pd.read_sql_table("table_name", con)
data.to_csv("table_name.csv")

参数:

  • table_name : string 数据库中SQL表的名称。
  • con : SQLAlchemy connectable (or database string URI) 不支持SQLite DBAPI连接模式。
  • schema : string, default None 要查询的数据库中的SQL模式的名称(如果数据库flavor支持此功能)。如果为None(默认值),则使用默认架构。
  • index_col : string or list of strings, optional, default: None 要设置为索引的列(MultiIndex)。
  • coerce_float : boolean, default True 尝试将非字符串,非数字对象(如decimal.Decimal)的值转换为浮点值。可能导致精度损失。
  • parse_dates : list or dict, default: None

    • List of column names to parse as dates.要解析为日期的列名列表。
    • Dict of {column_name: format string} where format string is strftime compatible in case of parsing string times or is one of (D, s, ns, ms, us) in case of parsing integer timestamps.{column_name:format string}的字典,其中格式字符串在解析字符串时间时与strftime兼容,或者在解析整数时间戳的情况下是(D,s,ns,ms,us)之一。
    • Dict of {column_name: arg dict}, where the arg dict corresponds to the keyword arguments of pandas.to_datetime() Especially useful with databases without native Datetime support, such as SQLite.
      {column_name:arg dict}的字典,其中arg dict对应于pandas.to_datetime()的关键字参数。对于没有本机Datetime支持的数据库(如SQLite)特别有用。
  • columns : list, default: None 从SQL表中选择的列名列表
  • chunksize : int, default None 如果指定,则返回一个迭代器,其中chunksize是要包含在每个块中的行数。

参考:
http://docs.sqlalchemy.org/en/latest/core/type_basics.html#sql-standard-and-multiple-vendor-types
http://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_sql.html
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_sql_query.html
http://docs.sqlalchemy.org/en/latest/core/engines.html
http://docs.sqlalchemy.org/en/latest/core/type_basics.html#sql-standard-and-multiple-vendor-types
http://stackoverflow.com/questions/30631325/writing-to-mysql-database-with-pandas-using-sqlalchemy-to-sql
http://stackoverflow.com/questions/5687718/how-can-i-insert-data-into-a-mysql-database
http://stackoverflow.com/questions/32235696/pandas-to-sql-gives-unicode-decode-error
http://stackoverflow.com/questions/34383000/pandas-to-sql-all-columns-as-nvarchar

标签: python

非特殊说明,本博所有文章均为博主原创。

评论啦~