21. Concurrent Programming with asyncio

The Asynchronous I/O (asyncio) Python library can be used with python-oracledb Thin mode for concurrent programming. This library allows you to run operations in parallel, for example to run a long-running operation in the background without blocking the rest of the application. With asyncio, you can easily write concurrent code with the async and await syntax. See Python’s Developing with asyncio documentation for useful tips.

The python-oracledb asynchronous API is a part of the standard python-oracledb module. All the synchronous methods that require a round-trip to the database now have corresponding asynchronous counterparts. You can choose whether to use the synchronous API or the asynchronous API in your code. It is recommended to not use both at the same time in your application.

The asynchronous API classes are AsyncConnection, AsyncConnectionPool, AsyncCursor, and AsyncLOB.

Note

Concurrent programming with asyncio is only supported in the python-oracledb Thin mode.

21.1. Connecting to Oracle Database Asynchronously

With python-oracledb, you can create an asynchronous connection to Oracle Database using either standalone connections or pooled connections. (For discussion of synchronous programming, see Connecting to Oracle Database.)

21.1.1. Standalone Connections

Standalone connections are useful for applications that need only a single connection to a database.

An asynchronous standalone connection can be created by calling the asynchronous method oracledb.connect_async() which establishes a connection to the database and returns an AsyncConnection Object. Once connections are created, all objects created by these connections follow the asynchronous programming model. Subject to appropriate use of await for calls that require a round-trip to the database, asynchronous connections are used in the same way that synchronous programs use Standalone Connections.

Asynchronous connections should be released when they are no longer needed to ensure Oracle Database gracefully cleans up. A preferred method is to use an asynchronous context manager. For example:

import asyncio
import oracledb

async def main():

    async with oracledb.connect_async(user="hr", password=userpwd,
                                      dsn="localhost/orclpdb") as connection:
        with connection.cursor() as cursor:
            await cursor.execute("select user from dual")
            async for result in cursor:
                print(result)

asyncio.run(main())

This code ensures that once the block is completed, the connection is closed and resources are reclaimed by the database. In addition, any attempt to use the variable connection outside of the block will fail.

If you do not use a context manager, you should explicitly close connections when they are no longer needed, for example:

connection = await oracle.connect_async(user="hr", password=userpwd,
                                        dsn="localhost/orclpdb")

cursor = connection.cursor()

await cursor.execute("select user from dual")
async for result in cursor:
    print(result)

cursor.close()
await connection.close()

21.1.2. Connection Pools

Connection pooling allows applications to create and maintain a pool of open connections to the database. Connection pooling is important for performance and scalability when applications need to handle a large number of users who do database work for short periods of time but have relatively long periods when the connections are not needed. The high availability features of pools also make small pools useful for applications that want a few connections available for infrequent use and requires them to be immediately usable when acquired.

An asynchronous connection pool can be created by calling oracledb.create_pool_async() which returns an AsyncConnectionPool Object. Note that this method is synchronous and does not use await. Once the pool has been created, your application can get a connection from it by calling AsyncConnectionPool.acquire(). After your application has used a connection, it should be released back to the pool to make it available for other users. This can be done by explicitly closing the connection or by using an asynchronous context manager, for example:

import asyncio
import oracledb

async def main():

    pool = oracle.create_pool_async(user="hr", password=userpwd,
                                    dsn="localhost/orclpdb",
                                    min=1, max=4, increment=1)

    async with pool.acquire() as connection:
        with connection.cursor() as cursor:
            await cursor.execute("select user from dual")
            async for result in cursor:
                print(result)

    await pool.close()

asyncio.run(main())

21.2. Executing SQL Using Asynchronous Methods

This section covers executing SQL using the asynchronous programming model. For discussion of synchronous programming, see Executing SQL.

Your application communicates with Oracle Database by executing SQL statements. Statements such as queries (statements beginning with SELECT or WITH), Data Manipulation Language (DML), and Data Definition Language (DDL) are executed using the asynchronous methods AsyncCursor.execute() or AsyncCursor.executemany(). Rows can be iterated over, or fetched using one of the methods AsyncCursor.fetchone(), AsyncCursor.fetchone(), AsyncCursor.fetchmany(), or AsyncCursor.fetchall().

You can also use shortcut methods on the API: AsyncConnection Objects object such as AsyncConnection.execute() or AsyncConnection.executemany(). Rows can be fetched using one of the shortcut methods AsyncConnection.fetchone(), AsyncConnection.fetchmany(), or AsyncConnection.fetchall().

An example of using AsyncConnection.fetchall():

import asyncio
import oracledb

async def main():

    async with oracledb.connect_async(user="hr", password=userpwd,
                                      dsn="localhost/orclpdb") as connection:
        res = await connection.fetchall("select * from locations")
        print(res)

asyncio.run(main())

An example that uses asyncio for parallelization and shows the execution of multiple coroutines:

import asyncio
import oracledb

# Number of coroutines to run
CONCURRENCY = 5

# Query the unique session identifier/serial number combination of a connection
SQL = """SELECT UNIQUE CURRENT_TIMESTAMP AS CT, sid||'-'||serial# AS SIDSER
         FROM v$session_connect_info
         WHERE sid = SYS_CONTEXT('USERENV', 'SID')"""

# Show the unique session identifier/serial number of each connection that the
# pool opens
async def init_session(connection, requested_tag):
    res = await connection.fetchone(SQL)
    print(res[0].strftime("%H:%M:%S.%f"), '- init_session with SID-SERIAL#', res[1])

# The coroutine simply shows the session identifier/serial number of the
# connection returned by the pool.acquire() call
async def query(pool):
    async with pool.acquire() as connection:
        await connection.callproc("dbms_session.sleep", [1])
        res = await connection.fetchone(SQL)
        print(res[0].strftime("%H:%M:%S.%f"), '- query with SID-SERIAL#', res[1])

async def main():

    pool = oracledb.create_pool_async(user="hr", password=userpwd,
                                      dsn="localhost/orclpdb",
                                      min=1, max=CONCURRENCY,
                                      session_callback=init_session)

    coroutines = [ query(pool) for i in range(CONCURRENCY) ]

    await asyncio.gather(*coroutines)

    await pool.close()

asyncio.run(main())

When you run this, you will see that multiple connections (identified by the unique Session Identifier and Serial Number combination) are opened and are used by query(). For example:

12:09:29.711525 - init_session with SID-SERIAL# 36-38096
12:09:29.909769 - init_session with SID-SERIAL# 33-56225
12:09:30.085537 - init_session with SID-SERIAL# 14-31431
12:09:30.257232 - init_session with SID-SERIAL# 285-40270
12:09:30.434538 - init_session with SID-SERIAL# 282-32608
12:09:30.730166 - query with SID-SERIAL# 36-38096
12:09:30.933957 - query with SID-SERIAL# 33-56225
12:09:31.115008 - query with SID-SERIAL# 14-31431
12:09:31.283593 - query with SID-SERIAL# 285-40270
12:09:31.457474 - query with SID-SERIAL# 282-32608

Your results may vary depending how fast your environment is.

See async_gather.py for a runnable example.

21.3. Managing Transactions Using Asynchronous Methods

This section covers managing transactions using the asynchronous programming model. For discussion of synchronous programming, see Managing Transactions.

When AsyncCursor.execute() or AsyncCursor.executemany() executes a SQL statement, a transaction is started or continued. By default, python-oracledb does not commit this transaction to the database. The methods AsyncConnection.commit() and AsyncConnection.rollback() methods can be used to explicitly commit or rollback a transaction:

async def main():
    async with oracledb.connect_async(user="hr", password=userpwd,
                                      dsn="localhost/orclpdb") as connection:

        with connection.cursor as cursor:
            await cursor.execute("INSERT INTO mytab (name) VALUES ('John')")
            await connection.commit()

When a database connection is closed, such as with AsyncConnection.close(), or when variables referencing the connection go out of scope, any uncommitted transaction will be rolled back.

An alternative way to commit is to set the attribute AsyncConnection.autocommit of the connection to True. This ensures all DML statements (INSERT, UPDATE, and so on) are committed as they are executed.

Note that irrespective of the autocommit value, Oracle Database will always commit an open transaction when a DDL statement is executed.

When executing multiple DML statements that constitute a single transaction, it is recommended to use autocommit mode only for the last DML statement in the sequence of operations. Unnecessarily committing causes extra database load, and can destroy transactional consistency.