CSV形式で渡される会計データや宛先リストを効率よく操作する際、pandasは便利なツールとして知られています。しかし、データ量が膨大になると、メモリ不足の壁に直面することも少なくありません。そんな時、SQLiteを活用すれば、データの絞り込みや集計が手軽に行え、さらにpandasとの連携で分析もスムーズに進みます。
本記事では、複数のCSVファイルをSQLiteに登録したり、逆にSQLiteのテーブルをCSVファイルに書き出す方法、pandas から直接SQLiteに検索クエリを発行し、結果をDataFrameに受け取る方法を紹介します。
更に、今回紹介する内容を簡単に実行できるよう、コピペで使える便利な自作クラスも作成しましたので、併せて紹介します。
SQLiteとpandasの相性の良さを活かしたい方、大量データの扱いにお悩みの方もぜひ参考にしてください。
pandasとSQLiteの連携
pandasとSQLiteを組み合わせるメリット
- 大量データの扱いが簡単に
pandasはデータフレームの操作に優れていますが、メモリに依存するため、大量データではメモリ不足に陥ることがあります。SQLiteを使用することで、SQLクエリで必要なデータだけをメモリにロードできるため、効率的にデータ処理が可能です。 - 柔軟なデータ操作
pandasの強力なデータ操作機能と、SQLiteのSQLクエリを組み合わせることで、より高度で柔軟なデータ分析ができます。たとえば、SQLiteでデータを事前集計し、pandasで可視化や詳細分析を行うことが可能です。 - 簡単にデータベースを構築できる
SQLiteはファイルベースのデータベースで、セットアップが不要。これにより、コードを書く敷居が低く、簡単に試せるのが魅力です。
必要なライブラリとインストール
必要なライブラリは次の2つです。
ライブラリ名 | インストール方法 |
---|---|
pandas | pip install pandas |
sqlite3 | インストール不要(Python に標準搭載されているため) |
使う場合は、次の通りインポートします。
import pandas as pd
import sqlite3
CSVファイルの内容をテーブルに登録
数十~数百MB程度の小さなCSVファイルを読み込む場合は、from_csv()
にて丸ごとメモリに読み込み、t_sql()
でテーブルに書き出す方法が便利です。
既に同じ名前のテーブルが存在する場合、if_exists
に "replace"を指定すると上書き、"append"を指定すると追加が可能です。
df.to_sql(テーブル名, conn, if_exists="replace", index=False)
import pandas as pd
import sqlite3
# CSV を読み込む
df = pd.read_csv("d:/mydata.csv")
# SQLiteに指定したテーブル名で保存する
with sqlite3.connect("d:/example.db") as conn:
df.to_sql("my_table", conn, if_exists="replace", index=False)
数GB~数十GBの巨大CSVの場合、read_csv() の chunksize に読み込みたい行数を指定し、ループで処理する方法を使います。chunksize を大きくするほと処理速度は上がりますが、メモリ不足の危険性が高まるため、適切な値を指定しなければなりません。初期値として10000程度にしておき、必要に応じて調整するのが良いかと思います。
pd.read_csv(CSVファイル名, chunksize = 10000)
import pandas as pd
import sqlite3
# CSVを指定行数づつ読み込んで、テーブルに追加していく
with sqlite3.connect("d:/example.db") as conn:
table_name = "my_table"
conn.execute(f"DROP TABLE IF EXISTS {table_name}")
for chunk in pd.read_csv("d:/mydata.csv", chunksize=10000):
chunk.to_sql(table_name, conn, if_exists="append", index=False)
print(f"{len(chunk)}行をデータベースに追加しました。")
1つの巨大なCSVを chunksize でサイズで分割しながら読み込む場合、データによっては精度が落ちる場合があります。これは、read_csv() でファイルを読み込むごとに、各カラムのデータ型が推論されるためです。
例えば、先頭100件が整数で、残りが実数の場合、先頭100件で整数型のテーブルが作成されるため、それ以降は整数に丸められる可能性があります。
これを避けるには、1度にメモリに読み込める両にCSVを分割しておくか、Create Table (後述)で型を指定しておくなどの対策が必要です。
pandas から SQLクエリを実行してデータを取得
read_sql_query()
を使うことで、pandas から任意のSQLクエリを実行し、結果を DataFrame で受け取ることができます。
pd.read_sql_query( SQLクエリ , conn )
import pandas as pd
import sqlite3
# SQLiteデータベースに接続
with sqlite3.connect("d:/example.db") as conn:
df = pd.read_sql_query("SELECT * FROM my_table", conn)
テーブルの内容をCSVに出力
テーブルのサイズがそれほど大きくない場合、 pandas の read_sql_query()
でSQLクエリを実行し、 to_csv()
でCSVファイルに出力する方法が便利です。
import pandas as pd
import sqlite3
# SQLiteに指定したテーブル名で保存する
with sqlite3.connect("d:/example.db") as conn:
df = pd.read_sql_query("SELECT * FROM my_table", conn)
# CSVファイルに書き出す
df.to_csv("d:/mydata.csv", index=False)
テーブルのサイズが大きい場合は、何回かに分けてCSVファイルに書き出す必要があります。この場合は SQLite の fetchmeny()
を使って、指定行数づつ取り出し、DataFrameに変換して to_csv()
で出力する方法が便利です。
import os
import pandas as pd
import sqlite3
database_file = "d:/example.db"
output_file = "d:/mydata.csv"
batch_size = 10000 # 一度に取得する行数
# 既にファイルがあれば削除
if os.path.exists(output_file):
os.remove(output_file)
with sqlite3.connect(database_file) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM my_table") # 全データ対象のクエリ
header = [description[0] for description in cursor.description] # ヘッダー名を取得
while True:
# fetchmanyでデータを少量ずつ取り出す
rows = cursor.fetchmany(10000)
if not rows:
break # データがなくなったら終了
# DataFrame に変換
df = pd.DataFrame(rows, columns=header)
df.to_csv("d:/mydata.csv", index=False, mode="a")
CSVファイルからテーブルを自動生成
巨大なCSVファイルを分割しながら read_csv() する際、最初に読み込んだデータによってカラムのデータ型が推論されるため、データによっては精度が落ちる可能性があるとこは先ほど指摘しました。
かと言ってゼロからCreate文を作成するのは面倒です。
read_csv() でCSVファイルを読みこませ、推論結果から Create文を作成すれば、これを修正するだけで済むので、作業が非常に楽になります。
import pandas as pd
# CSVファイルのパスとテーブル名を指定
file_path = "D:/mydata.csv"
table_name = "MyTable"
# CSVファイルを読み込む
df = pd.read_csv(file_path, encoding='shift_jis')
# カラム名とデータ型を取得
columns = df.columns
dtypes = df.dtypes
# SQLのデータ型に変換
sql_dtypes = []
for column, dtype in zip(columns, dtypes):
if 'int' in str(dtype):
sql_dtypes.append(f"{column} INTEGER")
elif 'float' in str(dtype):
sql_dtypes.append(f"{column} REAL")
elif 'object' in str(dtype):
sql_dtypes.append(f"{column} TEXT")
elif 'datetime' in str(dtype):
sql_dtypes.append(f"{column} DATETIME")
else:
sql_dtypes.append(f"{column} TEXT") # デフォルトはTEXT型
# CREATE TABLE文を生成
create_table_sql = f"CREATE TABLE {table_name} (\n " + ",\n ".join(sql_dtypes) + "\n);"
# 結果の表示
print(create_table_sql)
下記は出力結果の一例です。
CREATE TABLE MyTable (
従業員番号 INTEGER,
年齢 INTEGER,
住所 TEXT,
体重 REAL
);
便利な自作クラス
SQLiteUtilクラスの使い方
# データベースのパスを指定
db_path = "D:/LocalCache.db"
# SQLiteUtilのインスタンスを作成
util = SQLiteUtil(db_path)
# サンプルデータの作成(従業員データ)
data = {
'社員番号': [1, 2, 3],
'名前': ['田中太郎', '佐藤花子', '鈴木次郎'],
'年齢': [30, 25, 35],
'住所': ['東京都', '大阪府', '北海道'],
'メールアドレス': ['taro.tanaka@example.com', 'hanako.sato@example.com', 'jiro.suzuki@example.com']
}
df = pd.DataFrame(data)
# DataFrameをCSVファイルとして保存
csv_path = "D:/mydata.csv"
df.to_csv(csv_path, index=False, encoding='shift_jis')
# CSVファイルからテーブルを作成(上書き)
util.from_csv(csv_path, "MyTable") # デフォルトで上書き
# CSVファイルからテーブルにデータを追加
util.from_csv(csv_path, "MyTable", is_append=True) # データを追加
# DataFrameからデータをテーブルに追加
new_data = {
'社員番号': [4, 5, 6],
'名前': ['中村一郎', '高橋二郎', '伊藤三郎'],
'年齢': [40, 45, 50],
'住所': ['福岡県', '愛知県', '神奈川県'],
'メールアドレス': ['ichiro.nakamura@example.com', 'jiro.takahashi@example.com', 'saburo.ito@example.com']
}
new_df = pd.DataFrame(new_data)
util.to_table(new_df, "MyTable", is_append=True) # データを追加
# クエリを実行して結果を取得
query = "SELECT * FROM MyTable WHERE 年齢 > 20"
result_df = util.execute_query(query) # クエリ実行
print(result_df)
# テーブルをCSVファイルにエクスポート
output_path = "D:/exported_data.csv"
util.to_csv("MyTable", output_path, encoding='utf-8') # エンコーディングをutf-8に指定してエクスポート
# CSVファイルからCREATE TABLE文を生成
create_table_sql = util.generate_create_table_sql(csv_path, "MyTable")
print(create_table_sql) # CREATE TABLE文を表示
SQLiteUtilのリファレンス
メソッド名 | 説明 | パラメータ |
---|---|---|
init(db_path) | 初期化メソッド。データベースのパスを設定します。 | db_path: データベースのパス |
connect() | データベースに接続し、接続オブジェクトを返します。 | なし |
from_csv( file_path, table_name, encoding='shift_jis', is_append=False, chunksize=10000 ) | CSVファイルからデータベーステーブルを作成または追加します。 | file_path: CSVファイルのパス table_name: 作成するテーブルの名前 encoding: エンコーディング is_append: Trueの場合はデータを追加、 Falseの場合はテーブルを上書き chunksize: チャンクサイズ |
to_table( df, table_name, is_append=False ) | DataFrameからデータベーステーブルにデータを追加または上書きします。 | df: DataFrameオブジェクト table_name: データを挿入するテーブルの名前 is_append: Trueの場合はデータを追加、 Falseの場合は上書き |
execute_query(query) | SQLクエリを実行し、その結果をDataFrameとして返します。 | query: 実行するSQLクエリ |
to_csv( table_name, output_path, encoding='shift_jis', chunksize=10000 ) | データベーステーブルをCSVファイルにエクスポートします。 | table_name: エクスポートするテーブルの名前 output_path: 出力するCSVファイルのパス encoding: エンコーディング chunksize: チャンクサイズ |
generate_create_table_sql( file_path, table_name, encoding='shift_jis' ) | CSVファイルからCREATE TABLE文を生成します。 | file_path: CSVファイルのパス table_name: 作成するテーブルの名前 encoding: エンコーディング |
SQLiteUtilクラスのソースコード
import os
import sqlite3
import pandas as pd
class SQLiteUtil:
def __init__(self, db_path):
"""
初期化メソッド。データベースのパスを設定します。
"""
self.db_path = db_path
def connect(self):
"""
データベースに接続します。
"""
return sqlite3.connect(self.db_path)
def from_csv(self, file_path, table_name, encoding='shift_jis', is_append=False, chunksize=10000):
"""
CSVファイルからデータベーステーブルを作成または追加します。
:param file_path: CSVファイルのパス
:param table_name: 作成するテーブルの名前
:param encoding: CSVファイルのエンコーディング(デフォルトはshift_jis)
:param is_append: Trueの場合はデータを追加、Falseの場合はテーブルを上書き(デフォルトはFalse)
:param chunksize: チャンクサイズ(デフォルトは10000)
"""
# 上書きまたは追加の動作を決定
if_exists = 'append' if is_append else 'replace'
# チャンクごとにCSVファイルを読み込み、データベースに書き込む
with self.connect() as conn:
for chunk in pd.read_csv(file_path, encoding=encoding, chunksize=chunksize):
chunk.to_sql(table_name, conn, if_exists=if_exists, index=False)
if_exists = 'append' # 最初のチャンク以降はデータを追加
action = "追加" if is_append else "作成(上書き)"
print(f"テーブル '{table_name}' が正常に{action}されました。")
def to_table(self, df, table_name, is_append=False):
"""
DataFrameからデータベーステーブルにデータを追加または上書きします。
:param df: DataFrameオブジェクト
:param table_name: データを挿入するテーブルの名前
:param is_append: Trueの場合はデータを追加、Falseの場合は上書き(デフォルトはFalse)
"""
# 上書きまたは追加の動作を決定
if_exists = 'append' if is_append else 'replace'
# データベースに接続してデータを挿入
with self.connect() as conn:
df.to_sql(table_name, conn, if_exists=if_exists, index=False)
action = "追加" if is_append else "作成(上書き)"
print(f"データがテーブル '{table_name}' に正常に{action}されました。")
def execute_query(self, query):
"""
SQLクエリを実行し、その結果をDataFrameとして返します。
:param query: 実行するSQLクエリ
:return: クエリ結果のDataFrame
"""
# データベースに接続してクエリを実行
with self.connect() as conn:
result = pd.read_sql_query(query, conn)
return result
def to_csv(self, table_name, output_path, encoding='shift_jis', chunksize=10000):
"""
データベーステーブルをCSVファイルにエクスポートします。
:param table_name: エクスポートするテーブルの名前
:param output_path: 出力するCSVファイルのパス
:param encoding: エクスポートするCSVファイルのエンコーディング(デフォルトはshift_jis)
:param chunksize: チャンクサイズ(デフォルトは10000)
"""
# 既存ファイルの存在を確認
if os.path.exists(output_path):
print(f"警告: ファイル '{output_path}' は既に存在し、上書きされます。")
# データベースに接続してテーブルをCSVファイルにエクスポート
with self.connect() as conn:
with open(output_path, 'w', encoding=encoding, newline='') as file:
for chunk in pd.read_sql_query(f"SELECT * FROM {table_name}", conn, chunksize=chunksize):
chunk.to_csv(file, index=False, header=file.tell()==0, encoding=encoding)
print(f"テーブル '{table_name}' が '{output_path}' に正常にエクスポートされました。")
def generate_create_table_sql(self, file_path, table_name, encoding='shift_jis'):
"""
CSVファイルからCREATE TABLE文を生成します。
:param file_path: CSVファイルのパス
:param table_name: 作成するテーブルの名前
:param encoding: CSVファイルのエンコーディング(デフォルトはshift_jis)
:return: CREATE TABLE文
"""
# CSVファイルを読み込む
df = pd.read_csv(file_path, encoding=encoding)
# カラム名とデータ型を取得
columns = df.columns
dtypes = df.dtypes
# SQLのデータ型に変換
sql_dtypes = []
for column, dtype in zip(columns, dtypes):
if pd.api.types.is_integer_dtype(dtype):
sql_dtypes.append(f"{column} INTEGER")
elif pd.api.types.is_float_dtype(dtype):
sql_dtypes.append(f"{column} REAL")
elif pd.api.types.is_object_dtype(dtype):
sql_dtypes.append(f"{column} TEXT")
elif pd.api.types.is_datetime64_any_dtype(dtype):
sql_dtypes.append(f"{column} DATETIME")
else:
sql_dtypes.append(f"{column} TEXT") # デフォルトはTEXT型
# CREATE TABLE文を生成
create_table_sql = f"CREATE TABLE {table_name} (\n " + ",\n ".join(sql_dtypes) + "\n);"
return create_table_sql
まとめ
この記事では、pandasとSQLiteを組み合わせてCSVデータを効率的に扱う方法を紹介しました。
CSVデータのデータベース化、データベースからのCSV出力、そして型推定による自動テーブル作成など、実務に役立つ実践的な内容を解説しました。
pandasとSQLiteの連携は、データサイエンスやバックエンド開発でも応用可能です。ぜひ、日々の業務やプロジェクトに取り入れてみてください!
コメント