Think Twice
IT技術メモ | Pythonのメモ
Created: 2021-10-24 / Updated: 2021-10-24

pyodbcによるデータベース操作


目次


pyodbcのインストール

インストールはpipにて行うのが簡単です。

Copy
pip install pyodbc

コネクション取得

コネクションはpyodbc.connect()で取得します。
ここでは参考にSQL Serverを使った例を紹介します。

Copy
import pyodbc

# ODBCドライバ, サーバ名, データベース名, ユーザID, パスワードなどを指定します
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=testdb;UID=me;PWD=pass')
# DNSを使う場合はこんな感じ
cnxn = pyodbc.connect('DSN=test;PWD=password')

# コネクションからカーソルを作成します
cursor = cnxn.cursor()

SQLクエリの実行

SQLクエリを実行するには、カーソルのexecute()を使います。

Copy
cursor.execute("<SQLクエリ>")

SELECT

SELECTの実行

カーソルのexecute()にてSELECT文を実行します。

Copy
cursor.execute("select * from M_USER")

フェッチ処理

executeした後に、fetch系メソッドを使ってレコードを取り出します。
fetch系メソッドには以下の種類があります。

メソッド名 説明 使用例
fetchall SQLクエリの結果を全レコード取得。1件も取得できないときは空リストが返却される。 rows = cursor.fetchall()
fetchmany SQLクエリの結果を指定したレコード数づつ順次取得。1件も取得できないときは空リストが返却される。 rows = cursor.fetchmany()
fetchone SQLクエリの結果を1レコードづつ順次取得。取得できないときはNoneが返却される。 row = cursor.fetchone()
fetchval 最初の行の最初の値を取得 count(*) as cnt みたいなときに便利。fetchone()[0]と同じ。 value = cursor.fetchval()

1件ずつ取得

fetchone()は全件取得するとNoneを返すので、以下のように書くと1件ずつ全件処理できます。

1件ずつ処理
Copy
while True:
    row = cursor.fetchone()
    if not row:
        break
    print('id:', row.user_id)

全件取得

fetchall()で取得結果を全件処理します。

全件処理
Copy
cursor.execute("select user_id, user_name from users")
rows = cursor.fetchall()
for row in rows:
    print(row.user_id, row.user_name)

cursor.close()

rowのデータを取得するには、インデックスを使う方法とカラム名を使う方法があります。

データの取り出し
Copy
print('name:', row[1])         # インデックスで結果を取得(0始まり)
print('name:', row.user_name)  # 名前で結果を取得

もし、取得結果を使いまわす予定がないなら、カーソルがイテレータになっているため、そのまま以下のようにすることもできます。

イテレータで回す
Copy
cursor.execute("select user_id, user_name from users"):
for row in cursor:
    print(row.user_id, row.user_name)

executeはカーソルを返すので、さらに短く書けます。

イテレータで回す(もっと短く)
Copy
for row in cursor.execute("select user_id, user_name from users"):
    print(row.user_id, row.user_name)

指定件数ずつ取得

fetchall()だと件数が多いとメモリを大量に消費してしまうので、fetchmany()で件数を絞って取得することでメモリを節約できます。

1000件ずつ取得し処理
Copy
cursor.execute("select user_id, user_name from users")

row_cnt = 1000
rows = cursor.fetchmany(row_cnt)
while len(rows) > 0:
    for row in rows:
        print(row.user_id, row.user_name)
    # 次を取得
    rows = cursor.fetchmany(row_cnt)

cursor.close()

パラメータ

?を使います。

Copy
cursor.execute("""
    select user_id, user_name
      from users
     where last_logon < ?
       and bill_overdue = ?
""", datetime.date(2001, 1, 1), 'y')

パラメータを渡す部分はこういう書き方もできます。

Copy
cursor.execute("""
    select user_id, user_name
      from users
     where last_logon < ?
       and bill_overdue = ?
""", [datetime.date(2001, 1, 1), 'y'])

INSERT

カーソルのexecute()にてINSERT文を実行します。
処理結果の件数は、カーソルの.rowcountで取得できます。
実行後は、コネクションのcommit()もしくはrollback()を実行して、処理結果をDBへ確定させます。

Copy
cursor.execute("insert into products(id, name) values ('pyodbc', 'awesome library')")
cnxn.commit()

INSERT時は通常処理件数は1件ですが、複数件INSERTした場合などはその件数が.rowcountで取得できます。

Copy
cursor = cnxn.cursor()
cursor.execute("""
    insert into [dbo].[User]
    values ('A011', 'あああ', 10, null, 11, 'C'),
           ('A012', 'いいい', 10, null, 11, 'C'),
           ('A013', 'ううう', 10, null, 11, 'C')
""")
print(cursor.rowcount) # => 3
cnxn.commit()

パラメータも使えます。

Copy
cursor.execute("insert into products(id, name) values (?, ?)", 'pyodbc', 'awesome library')
cnxn.commit()

UPDATE

カーソルのexecute()にてUPDATE文を実行します。
処理結果の件数は、カーソルの.rowcountで取得できます。
実行後は、コネクションのcommit()もしくはrollback()を実行して、処理結果をDBへ確定させます。

Copy
cursor = cnxn.cursor()
cursor.execute("""
    update [dbo].[User]
    set BillingAmount = BillingAmount * 1.01
    where BillingAmount is not null
""")
print(cursor.rowcount)
cnxn.commit()

execute()はcursor自身を返すので、以下のように.rowcountで終わるような書き方もできます。

Copy
cursor = cnxn.cursor()
upd_count = cursor.execute("""
    update [dbo].[User]
    set BillingAmount = BillingAmount * 1.01
    where BillingAmount is not null
""").rowcount
print(upd_count)
cnxn.commit()

DELETE

カーソルのexecute()にてDELETE文を実行します。
処理結果の件数は、カーソルの.rowcountで取得できます。
実行後は、コネクションのcommit()もしくはrollback()を実行して、処理結果をDBへ確定させます。

Copy
cursor = cnxn.cursor()
cursor.execute("delete from products where id <> ?", 'pyodbc')
print(cursor.rowcount, 'products deleted')
cnxn.commit()

オートコミット

pyodbcではデフォルトでオートコミットはFalseとなっています。

Note, this whole article is relevant only when autocommit is set to False on the pyodbc connection (False is the default).

Database Transaction Management

もし明示的にTrueにする場合、以下のようにします。

取得時にTrueにする
Copy
cnxn = pyodbc.connect(conn_str, autocommit=True)
取得後にTrueにする
Copy
cnxn.autocommit = False  # enable transactions

トランザクション

トランザクションはオートコミットFalseの時に利用できます。
トランザクション関連のメソッドは以下。

メソッド名 説明
commit トランザクションをコミットします。
rollback トランザクションをコミットします。
close コネクションをクローズします。

トランザクションの分離レベル

ODBCでは、以下4つの分離レベルがサポートされています。

トランザクションの分離レベル
SQL_TXN_READ_UNCOMMITTED
SQL_TXN_READ_COMMITTED
SQL_TXN_REPEATABLE_READ
SQL_TXN_SERIALIZABLE

トランザクションの分離を設定するには、コネクションの.set_attrを使ってpyodbc.SQL_ATTR_TXN_ISOLATIONに設定します。

Copy
cnxn = pyodbc.connect(conn_str, autocommit=True)
cnxn.set_attr(pyodbc.SQL_ATTR_TXN_ISOLATION, pyodbc.SQL_TXN_SERIALIZABLE)
cnxn.autocommit = False  # enable transactions

Tips

SQLクエリを書く時

SQLではシングルクォート(')が文字列の括りに使われるので、SQL文を括るにはダブルクォート(")を使うとよいです。

ダブルクォートで括る
Copy
deleted = cursor.execute("delete from products where id <> 'pyodbc'").rowcount

名前付きカラム

asで名前を付けると、rowからプロパティのようにアクセス可能です。

asで名前を付けて取得
Copy
row = cursor.execute("select count(*) as user_count from users").fetchone()
print('%s users' % row.user_count)

dedent

textwrapdedentを使うと、SQLの前後の余分なスペースを除去してくれます。お好みで。

Copy
import textwrap
sql = textwrap.dedent("""
    select p.date_of_birth,
           p.email,
           a.city
    from person as p
    left outer join address as a on a.address_id = p.address_id
    where p.status = 'active'
      and p.name = ?
""")
rows = cursor.execute(sql, 'John Smith').fetchall()


参考

参照

参考サイト