Python/ETC

[SQLAlchemy] - Asynchronous I/O (asyncio) : 비동기 I/O (asyncio) 공식 문서 번역 1편 : 개요, 동시성 태스크와 같이 사용하기

Kani Kim 2024. 8. 4. 12:35

  1. 본 문서는 SQLAlchemy의 공식 문서를 번역한 것입니다. 
  2. 버전은 2.0.x 를 기준으로 합니다.
  3. 매주 일요일마다 올릴 예정이며, 번역에 오류가 있을 수 있습니다.
  4. https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html
 

Asynchronous I/O (asyncio) — SQLAlchemy 2.0 Documentation

>>> from __future__ import annotations >>> import asyncio >>> import datetime >>> from typing import List >>> from sqlalchemy import ForeignKey >>> from sqlalchemy import func >>> from sqlalchemy import select >>> from sqlalchemy.ext.asyncio import AsyncAt

docs.sqlalchemy.org

 

개요 - Core

 

핵심적인 사용을 위해 create_async_engine() 함수는 기존의 Engine API의 async(비동기) 버전에 해당하는 AsyncEngine 인스턴스를 만들고 제공합니다. 이 AsyncEngineAsyncEngine.connect()AsyncEngine.begin() 메소드를 통해 AsyncConnection을 전달합니다. 이 두 메소드 모두 비동기 컨텍스트 관리자를 제공합니다. 이 AsyncConnectionAsyncConnection.stream()을 활용해 스트리밍중인 서버 측의 AsyncResult를 전달하거나, AsyncConnection.execute() 메소드를 이용하여 저장되어(buffered) 있는 Result를 전달하여 실행문을 호출할 수 있습니다.

 

import asyncio

from sqlalchemy import Column
from sqlalchemy import MetaData
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.ext.asyncio import create_async_engine

meta = MetaData()
t1 = Table("t1", meta, Column("name", String(50), primary_key=True))


async def async_main() -> None:
    engine = create_async_engine("sqlite+aiosqlite://", echo=True)
    async with engine.begin() as conn:
        await conn.run_sync(meta.drop_all)
        await conn.run_sync(meta.create_all)
        await conn.execute(
            t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
        )
    async with engine.connect() as conn:
    	# Result를 선택, 저장된 상태로 전달되는
        # 결과들
        result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))
        print(result.fetchall())
    # 함수 범위에서 생성된 AsyncEngine을 위해, 닫고서
    # 풀된(pooled) 연결들을 청소한다
    await engine.dispose()


asyncio.run(async_main())
BEGIN (implicit)
...
CREATE TABLE t1 (
    name VARCHAR(50) NOT NULL,
    PRIMARY KEY (name)
)
...
INSERT INTO t1 (name) VALUES (?)
[...] [('some name 1',), ('some name 2',)]
COMMIT
BEGIN (implicit)
SELECT t1.name
FROM t1
WHERE t1.name = ?
[...] ('some name 1',)
[('some name 1',)]
ROLLBACK

 

위에서 사용된 AsyncConnection.run_sync() 메소드는 특별한 DDL(Data Definition Language, 데이터 정의 언어), awaitable(*await 키워드를 사용 가능한 객체)를 포함하지 않는 MetaData.create_all()같은 것들을 호출하는데 주로 사용될 수 있습니다.

 

TIP

위의 예제 코드에서 async_main 함수를 사용한 것처럼, AsyncEngine 객체를 사용 후 범위에서 컨텍스트를 벗어나거나 Garbage Collect가 될 때는, AsyncEngine.dispose() 메소드를 await와 함께 사용하는 것을 권합니다. 이는 연결 풀(connection pool)내에서 열린 연결이 awaitable 컨텍스트 내에서 올바르게 폐기(disposse)되도록 보장합니다. 블로킹 IO를 사용할 때와는 다르게, SQLAlchemy은 __del__이나 약한 참조 파이널라이저(weakref finalizers) 메소드에서 await를 호출할 기회가 없기에, 적절하게 이런 연결들을 폐기할 수 없습니다. 엔진이 범위를 벗어날 때 명시적으로 폐기하지 않으면 가비지 컬렉션 중에 RuntimeError: Event loop is closed와 같은 경고가 표준 출력으로 발생할 수 있습니다.

 

AsyncConnection은 또한 AsyncResult 객체를 반환하는 AsyncConnection.stream() 메소드를 통한 "streaming" API 특징을 가지고 있습니다. 이 결과는 서버 측의 커서(cursor)를 사용하고 비동기 반복자(async iterator)와 같은 async/await API를 제공합니다.

async with engine.connect() as conn:
    async_result = await conn.stream(select(t1))

    async for row in async_result:
        print("row: %s" % (row,))

 

개요 - ORM

(SQLAlchemy) 2.0 스타일의 쿼리를 통해 AsyncSession 클래스는 완전한 ORM 기능을 제공합니다.

 

기본 모드를 사용함에 있어 지연 로딩(lazy loading)이나 기타 만료된 속성 접근을 피하기 위한 특별한 주의가 필요합니다. 다음 섹션인 AsyncSession 사용 시 암시적 IO 방지(Preventing Implicit IO when Using AsyncSession)에서 이에 대해 자세히 설명합니다.

 

Warning

하나의 AsyncSession 인스턴스는 여러개의 동시성 태스크들에 사용에 있어서 안전하지 않습니다. "동시성 태스크와 함께 AsyncSession 사용하기(Using AsyncSession with Concurrent Tasks)" 섹션과
"세션은 스레드 안전한가? AsyncSession은 동시성 태스크들 사이에서 공유하기에 안전한가?(Is the Session thread-safe? Is AsyncSession safe to share in concurrent tasks?)" 섹션을 미리 봐두세요.

 

from __future__ import annotations

import asyncio
import datetime
from typing import List

from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
from sqlalchemy.orm import selectinload


class Base(AsyncAttrs, DeclarativeBase):
    pass

class B(Base):
    __tablename__ = "b"
    id: Mapped[int] = mapped_column(primary_key=True)
    a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
    data: Mapped[str]

class A(Base):
    __tablename__ = "a"
    id: Mapped[int] = mapped_column(primary_key=True)
    data: Mapped[str]
    create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
    bs: Mapped[List[B]] = relationship()

async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
    async with async_session() as session:
        async with session.begin():
            session.add_all(
                [
                    A(bs=[B(data="b1"), B(data="b2")], data="a1"),
                    A(bs=[], data="a2"),
                    A(bs=[B(data="b3"), B(data="b4")], data="a3"),
                ]
            )


async def select_and_update_objects(
    async_session: async_sessionmaker[AsyncSession],
) -> None:
    async with async_session() as session:
        stmt = select(A).order_by(A.id).options(selectinload(A.bs))
        result = await session.execute(stmt)
        for a in result.scalars():
            print(a, a.data)
            print(f"created at: {a.create_date}")
            for b in a.bs:
                print(b, b.data)
        result = await session.execute(select(A).order_by(A.id).limit(1))
        a1 = result.scalars().one()
        a1.data = "new data"
        await session.commit()
        # 커밋한 다음에 속성에 접근, 이는
        # expire_on_commit=False로 인해 가능한 부분
        print(a1.data)
        # 대신, awaitable로서
        # AsyncAttrs가 어느 속성에나 접근 할 수 있도록 할 수 있다(2.0.13에서 새로 도입)
        for b1 in await a1.awaitable_attrs.bs:
            print(b1, b1.data)


async def async_main() -> None:
    engine = create_async_engine("sqlite+aiosqlite://", echo=True)
    # async_sessionmaker: AsyncSession 객체들을 만들어내기 위한 공장.
    # expire_on_commit - 트랙잭션 커밋 후에 객체를 만료시키지 않도록 한다
    async_session = async_sessionmaker(engine, expire_on_commit=False)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    await insert_objects(async_session)
    await select_and_update_objects(async_session)
    # 함수 범위에서 생성된 AsyncEngine을 위해, 닫고서
    # 풀된(pooled) 연결들을 청소한다
    await engine.dispose()


asyncio.run(async_main())
BEGIN (implicit)
...
CREATE TABLE a (
    id INTEGER NOT NULL,
    data VARCHAR NOT NULL,
    create_date DATETIME DEFAULT (CURRENT_TIMESTAMP) NOT NULL,
    PRIMARY KEY (id)
)
...
CREATE TABLE b (
    id INTEGER NOT NULL,
    a_id INTEGER NOT NULL,
    data VARCHAR NOT NULL,
    PRIMARY KEY (id),
    FOREIGN KEY(a_id) REFERENCES a (id)
)
...
COMMIT
BEGIN (implicit)
INSERT INTO a (data) VALUES (?) RETURNING id, create_date
[...] ('a1',)
...
INSERT INTO b (a_id, data) VALUES (?, ?) RETURNING id
[...] (1, 'b2')
...
COMMIT
BEGIN (implicit)
SELECT a.id, a.data, a.create_date
FROM a ORDER BY a.id
[...] ()
SELECT b.a_id AS b_a_id, b.id AS b_id, b.data AS b_data
FROM b
WHERE b.a_id IN (?, ?, ?)
[...] (1, 2, 3)
<A object at ...> a1
created at: ...
<B object at ...> b1
<B object at ...> b2
<A object at ...> a2
created at: ...
<A object at ...> a3
created at: ...
<B object at ...> b3
<B object at ...> b4
SELECT a.id, a.data, a.create_date
FROM a ORDER BY a.id
LIMIT ? OFFSET ?
[...] (1, 0)
UPDATE a SET data=? WHERE a.id = ?
[...] ('new data', 1)
COMMIT
new data
<B object at ...> b1
<B object at ...> b2

 

위의 예시에서 AsyncSession은 선택적으로 사용되는 async_sessionmaker 도움자(helper)를 사용해 인스턴스화(instantiated)됩니다. 이 도움자는 고정된 매개변수 집합과 함께 새로운 AsyncSession 객체를 생성하는 공장을 제공하며, 여기서는 특정 데이터베이스 URL에 대한 AsyncEngine과 연결하는 것을 포함합니다. 이후 이 세션은 다른 메서드에 전달되어 Python 비동기 컨텍스트 관리자(예를들어, async with: 실행문)에서 사용될 수 있습니다. 이 경우 블록이 끝날 때 자동으로 닫히며, 이는 AsyncSession.close() 메서드를 호출하는 것과 동일합니다.

 

AsyncSession을 동시성 태스크와 같이 사용하기

AsyncSession가변적(mutable)이고 상태를 유지(stateful)하는 객체입니다. 이는 하나의 상태를 유지하는 데이터베이스의 진행 중인 트랜잭션을 표현합니다. asyncio와 함께 동시성 태스크를 사용하기 위해서는 (예를 들어 asyncio.gather()와 같은 API들) 각각의 태스크에다 분리된 AsyncSession을 사용해야 합니다.

 

"세션은 스레드 안전한가? AsyncSession은 동시성 태스크들 사이에서 공유하기에 안전한가?(Is the Session thread-safe? Is AsyncSession safe to share in concurrent tasks?)" 섹션을 통해 Session AsyncSession, 이 둘이 어떻게 동시성 워크로드에 대해서 어떻게 사용되어야 하는지와 함께 일반적인 설명을 보세요.

 

 

728x90
반응형