StreamlitのLLM呼び出しが遅い?並列処理で爆速化する3つの方法を比較してみた

修論まだ終わってないのに…🥲🥲🥲🥲💦💦💦💦

現在、絶賛修論執筆中の身ですが、修論の研究でStreamlitでLLMを使った実験用のアプリを作っていたときに 「複数のLLM APIを呼び出して、結果が返ってくるの遅くない?」から始まった技術検証の話を書きます。

複数のプロンプトをLLMに投げる処理があると、どうしても待ち時間が気になるんですよね。1回のAPI呼び出しに1秒かかるとして、5回呼び出したら5秒。これ、並列にしたら1秒ちょっとで終わるんじゃないか?と。

というわけで、今回は順次処理スレッド並列(ThreadPoolExecutor)非同期並列(asyncio) の3パターンで実装して、実際にどれくらい速度差が出るのか検証してみました。

この記事でわかること!

  • Streamlitの基本的な使い方(初心者向けにすこし解説)
  • concurrent.futures.ThreadPoolExecutor による並列化
  • asyncio + aiohttp による非同期処理
  • 実際の速度比較

環境構築

プロジェクトの作成

今回は uv を使ってプロジェクトを作成します。

# プロジェクトディレクトリ作成
$ mkdir streamlit-parallel-demo
$ cd streamlit-parallel-demo

# 仮想環境の作成
$ uv venv

# 仮想環境の有効化
$ source .venv/bin/activate

# 依存パッケージのインストール
$ uv pip install streamlit openai python-dotenv aiohttp

⚠️uvを使っていない場合は、普通に仮想環境を作ってもOKです。python -m venv .venv で仮想環境を作成し、pip install streamlit openai python-dotenv aiohttp で大丈夫。ただ、最近はuvがかなり速くて便利なので、まだ使ったことがない方はこの機会に試してみるのもいいかもしれません。

インストールするパッケージの説明

パッケージ 役割
streamlit WebアプリのUIフレームワーク
openai OpenAI APIの公式クライアント(順次処理・スレッド並列で使用)
python-dotenv .envファイルから環境変数を読み込む
aiohttp 非同期HTTPクライアント(asyncioパターンで使用)

👉️aiohttpは非同期でHTTPリクエストを送れるライブラリです。OpenAIの公式クライアントは同期処理向けなので、asyncioで並列化する場合はaiohttpを使って直接APIを叩く必要があります。ちょっと手間が増えますが、大量のリクエストを効率的に処理できます。

モデルとAPIキーの設定

プロジェクトルートに .env ファイルを作成します。

# .env
OPENAI_API_KEY=sk-your-api-key-here
OPENAI_MODEL=gpt-5-mini

⚠️.env ファイルは必ず .gitignore に追加してください。APIキーをGitHubに上げてしまうと大変なことになります。経験者は語る…ではないですが、よく聞く話です🥲

ディレクトリ構成

最終的にはこんな感じになります。

streamlit-parallel-demo/
├── .env
├── app_sequential.py      # パターン1: 順次処理
├── app_threaded.py        # パターン2: スレッド並列
├── app_async.py           # パターン3: 非同期並列
└── app_allpattern.py      # 3つを比較するアプリ

Streamlitの基本(初心者向け)

Streamlitを触るのが初めての方もいると思うので、ここで簡単に基本を押さえておきます。

Streamlitとは?

Pythonスクリプトを書くだけでWebアプリが作れるフレームワークです。HTMLもCSSも書かなくていいのが最高なんですよね。データサイエンス界隈でよく使われていますが、ちょっとしたデモアプリを作るのにも便利です。

基本的な書き方

app_sample.pyというファイルを作成して、以下のコードを書いてみてください。

import streamlit as st

st.title("こんにちは")
st.write("これがStreamlitです")

if st.button("クリック"):
    st.write("ボタンが押されました!")

これだけでWebアプリになります。驚きです🤗

実行方法

$ streamlit run app_sample.py

👉️Streamlitは「スクリプト全体が上から下まで再実行される」という特徴があります。ボタンを押すたびに全部実行されます。これ、ちょっと戸惑うかもしれませんが、慣れると逆にシンプルでいいなと思えてきます。


LLM APIの並列化を実験してみる

パターン1: 順次処理(Sequential)

まずは基本形。何も工夫せず、順番にAPIを呼び出すパターンです。

ソースコード

app_sequential.py

# app_sequential.py
import os
import time
import streamlit as st
from openai import OpenAI
from dotenv import load_dotenv

# 環境変数の読み込み
load_dotenv()

# 設定
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5-mini")

# OpenAIクライアント
client = OpenAI(api_key=OPENAI_API_KEY)

# テスト用のプロンプト
PROMPTS = [
    "日本の首都を一言で答えてください。",
    "1+1の答えを一言で答えてください。",
    "水の化学式を一言で答えてください。",
    "地球から月までの距離を概算で一言で答えてください。",
    "Pythonの特徴を一言で答えてください。",
]


def call_llm(prompt: str, index: int) -> dict:
    """LLMを呼び出す(1回分)"""
    start = time.time()
    response = client.chat.completions.create(
        model=OPENAI_MODEL,
        messages=[{"role": "user", "content": prompt}],
        max_completion_tokens=500,
        reasoning_effort="low",  # 推論を最小限に抑える
    )
    elapsed = time.time() - start
    return {
        "index": index,
        "prompt": prompt,
        "response": response.choices[0].message.content,
        "time": elapsed,
    }


def run_sequential(prompts: list) -> tuple[list, float]:
    """順次処理で全プロンプトを実行"""
    results = []
    total_start = time.time()

    for i, prompt in enumerate(prompts):
        result = call_llm(prompt, i)
        results.append(result)

    total_time = time.time() - total_start
    return results, total_time


# ===== Streamlit UI =====
st.set_page_config(page_title="順次処理デモ", page_icon="🐢")
st.title("順次処理デモ")
st.write(f"モデル: `{OPENAI_MODEL}` / プロンプト数: {len(PROMPTS)}件")

st.divider()

if st.button("実行", type="primary"):
    with st.spinner("順次処理で実行中..."):
        results, total_time = run_sequential(PROMPTS)

    # 結果表示
    st.metric("総実行時間", f"{total_time:.2f} 秒")

    st.subheader("結果一覧")
    for r in results:
        with st.expander(f"Q{r['index']+1}: {r['prompt'][:20]}... ({r['time']:.2f}秒)"):
            st.write(f"**回答:** {r['response']}")

実行する

$ streamlit run app_sequential.py

ブラウザで http://localhost:8501 が開きます。「実行」ボタンを押すと、5つのプロンプトが順番に処理されて、だいたい 10秒 くらいかかるはずです。


パターン2: ThreadPoolExecutor(スレッド並列)

次は concurrent.futures.ThreadPoolExecutor を使った並列処理です。これが一番導入しやすいと思います。

ソースコード

app_threaded.py

# app_threaded.py
import os
import time
import streamlit as st
from concurrent.futures import ThreadPoolExecutor, as_completed
from openai import OpenAI
from dotenv import load_dotenv

# 環境変数の読み込み
load_dotenv()

# 設定
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5-mini")

# OpenAIクライアント
client = OpenAI(api_key=OPENAI_API_KEY)

# テスト用のプロンプト
PROMPTS = [
    "日本の首都を一言で答えてください。",
    "1+1の答えを一言で答えてください。",
    "水の化学式を一言で答えてください。",
    "地球から月までの距離を概算で一言で答えてください。",
    "Pythonの特徴を一言で答えてください。",
]


def call_llm(prompt: str, index: int) -> dict:
    """LLMを呼び出す(1回分)"""
    start = time.time()
    response = client.chat.completions.create(
        model=OPENAI_MODEL,
        messages=[{"role": "user", "content": prompt}],
        max_completion_tokens=500,
        reasoning_effort="low",  # 推論を最小限に抑える
    )
    elapsed = time.time() - start
    return {
        "index": index,
        "prompt": prompt,
        "response": response.choices[0].message.content,
        "time": elapsed,
    }


def run_threaded(prompts: list, max_workers: int = 5) -> tuple[list, float]:
    """ThreadPoolExecutorで並列処理"""
    results = []
    total_start = time.time()

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 全タスクを投入
        futures = {
            executor.submit(call_llm, prompt, i): i
            for i, prompt in enumerate(prompts)
        }

        # 完了したものから結果を取得
        for future in as_completed(futures):
            results.append(future.result())

    # インデックス順にソート(完了順がバラバラなので)
    results.sort(key=lambda x: x["index"])
    total_time = time.time() - total_start
    return results, total_time


# ===== Streamlit UI =====
st.set_page_config(page_title="スレッド並列デモ")
st.title("スレッド並列デモ(ThreadPoolExecutor)")
st.write(f"モデル: `{OPENAI_MODEL}` / プロンプト数: {len(PROMPTS)}件")

st.divider()

# ワーカー数の設定
max_workers = st.slider("並列数(max_workers)", min_value=1, max_value=10, value=5)

if st.button("実行", type="primary"):
    with st.spinner(f"スレッド並列({max_workers}並列)で実行中..."):
        results, total_time = run_threaded(PROMPTS, max_workers)

    # 結果表示
    st.metric("総実行時間", f"{total_time:.2f} 秒")

    st.subheader("結果一覧")
    for r in results:
        with st.expander(f"Q{r['index']+1}: {r['prompt'][:20]}... ({r['time']:.2f}秒)"):
            st.write(f"**回答:** {r['response']}")

変更したポイント!

ポイントはここです。

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = {
        executor.submit(call_llm, prompt, i): i
        for i, prompt in enumerate(prompts)
    }
    for future in as_completed(futures):
        results.append(future.result())

executor.submit() でタスクを投入すると、スレッドプールが自動的に並列実行してくれます。as_completed() は完了したものから順に取得できる便利な関数です。

実行する

$ streamlit run app_threaded.py

同じ5つのプロンプトが、今度は 3-4秒 くらいで終わるはずです。体感でもかなり速くなったのがわかると思います。 どちらかというと一番時間の長く生成にかかったものの時間に近い感じになります。3倍以上の速度向上ですね🤩

👉️max_workers の値は、API側のレートリミットも考慮して設定しましょう。あまり大きくしすぎると 429 Too Many Requests エラーが返ってくることがあります。


パターン3: asyncio + aiohttp(非同期並列)

最後は asyncio を使った非同期処理です。これが一番効率的ですが、コードは少し複雑になります。

ソースコード

# app_async.py
import os
import time
import asyncio
import aiohttp
import streamlit as st
from dotenv import load_dotenv

# 環境変数の読み込み
load_dotenv()

# 設定
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5-mini")

# テスト用のプロンプト
PROMPTS = [
    "日本の首都を一言で答えてください。",
    "1+1の答えを一言で答えてください。",
    "水の化学式を一言で答えてください。",
    "地球から月までの距離を概算で一言で答えてください。",
    "Pythonの特徴を一言で答えてください。",
]


async def call_llm_async(
    session: aiohttp.ClientSession,
    prompt: str,
    index: int
) -> dict:
    """非同期でLLMを呼び出す"""
    start = time.time()

    url = "https://api.openai.com/v1/chat/completions"
    headers = {
        "Authorization": f"Bearer {OPENAI_API_KEY}",
        "Content-Type": "application/json",
    }
    payload = {
        "model": OPENAI_MODEL,
        "messages": [{"role": "user", "content": prompt}],
        "max_completion_tokens": 500,
        "reasoning_effort": "low",  # 推論を最小限に抑える
    }

    async with session.post(url, headers=headers, json=payload) as response:
        data = await response.json()
        elapsed = time.time() - start

        # エラーチェック
        if "error" in data:
            return {
                "index": index,
                "prompt": prompt,
                "response": f"エラー: {data['error']['message']}",
                "time": elapsed,
            }

        return {
            "index": index,
            "prompt": prompt,
            "response": data["choices"][0]["message"]["content"],
            "time": elapsed,
        }


async def run_async_all(prompts: list) -> tuple[list, float]:
    """asyncioで全プロンプトを並列実行"""
    total_start = time.time()

    async with aiohttp.ClientSession() as session:
        tasks = [
            call_llm_async(session, prompt, i)
            for i, prompt in enumerate(prompts)
        ]
        results = await asyncio.gather(*tasks)

    results = list(results)
    results.sort(key=lambda x: x["index"])
    total_time = time.time() - total_start
    return results, total_time


def run_async(prompts: list) -> tuple[list, float]:
    """Streamlitから呼び出すためのラッパー"""
    return asyncio.run(run_async_all(prompts))


# ===== Streamlit UI =====
st.set_page_config(page_title="非同期並列デモ")
st.title("非同期並列デモ(asyncio + aiohttp)")
st.write(f"モデル: `{OPENAI_MODEL}` / プロンプト数: {len(PROMPTS)}件")

st.divider()

if st.button("実行", type="primary"):
    with st.spinner("非同期並列で実行中..."):
        results, total_time = run_async(PROMPTS)

    # 結果表示
    st.metric("総実行時間", f"{total_time:.2f} 秒")

    st.subheader("結果一覧")
    for r in results:
        with st.expander(f"Q{r['index']+1}: {r['prompt'][:20]}... ({r['time']:.2f}秒)"):
            st.write(f"**回答:** {r['response']}")

ThreadPoolExecutorとの違い

違いは2つあります。

  • OpenAI公式クライアントが使えない … aiohttp を使って自分でHTTPリクエストを書く必要があります。これがちょっと面倒なところ。
  • async/await の構文が必要 … 非同期処理の基本的な書き方を理解しておく必要があります。Pythonの非同期処理に慣れていないと少しとっつきにくいかもしれません。

以下の部分が非同期処理のキモです。

async def call_llm_async(session, prompt, index):
    async with session.post(url, headers=headers, json=payload) as response:
        data = await response.json()
        return {...}

⚠️asyncio.run() はイベントループを新しく作成します。Streamlitは内部でイベントループを使っている場合があるので、環境によってはうまく動かないことがあります。その場合は ThreadPoolExecutor を使う方が安全です。

実行する

$ streamlit run app_async.py

速度は ThreadPoolExecutor とほぼ同じか、若干速いくらいです。大量のリクエスト(数十〜数百)を投げる場合は、asyncio の方がメモリ効率が良いです。


3パターンを比較するアプリ

せっかくなので、3つのパターンを並べて比較できるアプリも作ってみました。

ソースコード

app_allpattern.py

# app_allpattern.py
import os
import time
import asyncio
import aiohttp
import streamlit as st
from concurrent.futures import ThreadPoolExecutor, as_completed
from openai import OpenAI
from dotenv import load_dotenv

# 環境変数の読み込み
load_dotenv()

# 設定
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-5-mini")

# OpenAIクライアント(同期用)
client = OpenAI(api_key=OPENAI_API_KEY)

# テスト用のプロンプト
PROMPTS = [
    "日本の首都を一言で答えてください。",
    "1+1の答えを一言で答えてください。",
    "水の化学式を一言で答えてください。",
    "地球から月までの距離を概算で一言で答えてください。",
    "Pythonの特徴を一言で答えてください。",
]


# ============================================
# パターン1: 順次処理
# ============================================
def call_llm_sync(prompt: str, index: int) -> dict:
    """同期的にLLMを呼び出す"""
    start = time.time()
    response = client.chat.completions.create(
        model=OPENAI_MODEL,
        messages=[{"role": "user", "content": prompt}],
        max_completion_tokens=500,
        reasoning_effort="low",  # 推論を最小限に抑える
    )
    elapsed = time.time() - start
    return {
        "index": index,
        "prompt": prompt,
        "response": response.choices[0].message.content,
        "time": elapsed,
    }


def run_sequential(prompts: list) -> tuple[list, float]:
    """順次処理"""
    results = []
    total_start = time.time()
    for i, prompt in enumerate(prompts):
        result = call_llm_sync(prompt, i)
        results.append(result)
    total_time = time.time() - total_start
    return results, total_time


# ============================================
# パターン2: ThreadPoolExecutor
# ============================================
def run_threaded(prompts: list, max_workers: int = 5) -> tuple[list, float]:
    """スレッド並列処理"""
    results = []
    total_start = time.time()
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(call_llm_sync, prompt, i): i
            for i, prompt in enumerate(prompts)
        }
        for future in as_completed(futures):
            results.append(future.result())
    results.sort(key=lambda x: x["index"])
    total_time = time.time() - total_start
    return results, total_time


# ============================================
# パターン3: asyncio + aiohttp
# ============================================
async def call_llm_async(
    session: aiohttp.ClientSession,
    prompt: str,
    index: int
) -> dict:
    """非同期でLLMを呼び出す"""
    start = time.time()
    url = "https://api.openai.com/v1/chat/completions"
    headers = {
        "Authorization": f"Bearer {OPENAI_API_KEY}",
        "Content-Type": "application/json",
    }
    payload = {
        "model": OPENAI_MODEL,
        "messages": [{"role": "user", "content": prompt}],
        "max_completion_tokens": 500,
        "reasoning_effort": "low",  # 推論を最小限に抑える
    }
    async with session.post(url, headers=headers, json=payload) as response:
        data = await response.json()
        elapsed = time.time() - start
        if "error" in data:
            return {
                "index": index,
                "prompt": prompt,
                "response": f"エラー: {data['error']['message']}",
                "time": elapsed,
            }
        return {
            "index": index,
            "prompt": prompt,
            "response": data["choices"][0]["message"]["content"],
            "time": elapsed,
        }


async def run_async_all(prompts: list) -> tuple[list, float]:
    """asyncio並列処理"""
    total_start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [call_llm_async(session, prompt, i) for i, prompt in enumerate(prompts)]
        results = await asyncio.gather(*tasks)
    results = list(results)
    results.sort(key=lambda x: x["index"])
    total_time = time.time() - total_start
    return results, total_time


def run_async(prompts: list) -> tuple[list, float]:
    """Streamlit用ラッパー"""
    return asyncio.run(run_async_all(prompts))


# ============================================
# 結果表示用の関数
# ============================================
def display_results(results: list, total_time: float, method_name: str):
    """結果を表示"""
    st.metric(f"{method_name}", f"{total_time:.2f} 秒")
    for r in results:
        with st.expander(f"Q{r['index']+1}: ({r['time']:.2f}秒)"):
            st.caption(r['prompt'])
            st.write(r['response'])


# ============================================
# Streamlit UI
# ============================================
st.set_page_config(page_title="LLM並列処理比較", layout="wide")
st.title("⚡ LLM API 並列処理 速度比較")
st.write(f"モデル: `{OPENAI_MODEL}` / プロンプト数: {len(PROMPTS)}件")

st.divider()

# 実行方法の選択
method = st.radio(
    "実行方法を選択",
    ["順次処理", "ThreadPoolExecutor", "asyncio + aiohttp", "全て比較"],
    horizontal=True,
)

if st.button("実行", type="primary"):

    if method == "順次処理":
        with st.spinner("順次処理で実行中..."):
            results, total_time = run_sequential(PROMPTS)
        display_results(results, total_time, "順次処理")

    elif method == "ThreadPoolExecutor":
        with st.spinner("スレッド並列で実行中..."):
            results, total_time = run_threaded(PROMPTS)
        display_results(results, total_time, "ThreadPoolExecutor")

    elif method == "asyncio + aiohttp":
        with st.spinner("非同期並列で実行中..."):
            results, total_time = run_async(PROMPTS)
        display_results(results, total_time, "asyncio + aiohttp")

    else:  # 全て比較
        st.subheader("3パターン同時実行")

        col1, col2, col3 = st.columns(3)
        times = {}

        with col1:
            st.markdown("### 順次処理")
            with st.spinner("実行中..."):
                results1, time1 = run_sequential(PROMPTS)
            times["順次処理"] = time1
            display_results(results1, time1, "順次処理")

        with col2:
            st.markdown("### ThreadPool")
            with st.spinner("実行中..."):
                results2, time2 = run_threaded(PROMPTS)
            times["ThreadPool"] = time2
            display_results(results2, time2, "ThreadPool")

        with col3:
            st.markdown("### asyncio")
            with st.spinner("実行中..."):
                results3, time3 = run_async(PROMPTS)
            times["asyncio"] = time3
            display_results(results3, time3, "asyncio")

        # 比較結果
        st.divider()
        st.subheader("実行時間比較")

        # 棒グラフ
        st.bar_chart(times)

        # 速度向上率
        if time1 > 0:
            st.success(
                f"**速度向上**: "
                f"ThreadPool **{time1/time2:.1f}倍** / "
                f"asyncio **{time1/time3:.1f}倍**"
            )

        # 補足情報
        st.info(
            "**補足**: 並列処理はAPI待ち時間を重ねられるため高速化できます。"
            "ただし、APIのレートリミットには注意が必要です。"
        )

実行する

$ streamlit run app_allpattern.py

【全て比較】を選んで【実行】ボタンをクリックすると、3つのパターンが横並びで実行され比較できます。


実行結果の例

5つのプロンプトを実行した場合の典型的な結果です(ネットワーク環境により変動します)

方式 速度向上
順次処理 1.0倍(基準)
ThreadPoolExecutor 約4.0倍
asyncio + aiohttp 約4.7倍

プロンプト数が増えれば増えるほど、差は開いていきます。


どれを選ぶべき?

個人的な結論としては、こんな感じでしょうか?

ユースケース おすすめ
プロトタイプ・少量のリクエスト 順次処理
既存コードの改修・中規模 ThreadPoolExecutor ← おすすめ
大量リクエスト・本番環境 asyncio + aiohttp

正直なところ、ThreadPoolExecutor が一番バランスいいと思います。導入が簡単で、OpenAIの公式クライアントがそのまま使えて、十分な速度向上が得られます🤗


注意点

OpenAI APIにはレートリミットがあります。並列数を増やしすぎると 429 Too Many Requests が返ってきます。

# 控えめに設定するのが安全
with ThreadPoolExecutor(max_workers=3) as executor:
    ...

また、並列処理では、1つの失敗が全体に影響しないようにしておくと安心です。

def call_llm_safe(prompt: str, index: int) -> dict:
    try:
        return call_llm_sync(prompt, index)
    except Exception as e:
        return {"index": index, "prompt": prompt, "response": f"エラー: {e}", "time": 0}

おわりに

というわけで、StreamlitでLLM APIを並列化する方法を3パターン試してみました。

  • 順次処理から並列処理に変えるだけで 3倍の速度向上 が期待できる
  • ThreadPoolExecutor は導入が簡単で効果が高い
  • asyncio は大量リクエスト向けだが、コードが複雑になる
  • ぜひ皆さんも試してみてください!修論執筆中の私も頑張ります…🥲💦