QT2 Backtrader学习笔记

1. 示例代码1

backtrader的示例代码,如果标的价格昨天降了则买,昨天涨了则卖,标的价格随机生成,模拟一年的数据

import pandas as pd
import numpy as np
import backtrader as bt

# 生成一年的随机价格数据(交易日)
dates = pd.date_range(start="2020-01-01", periods=252, freq="B")  # 252 个交易日
# 随机价格生成:起始价 100,然后累积正态分布波动
price = 100 + np.random.randn(len(dates)).cumsum()
# 构造 DataFrame
df = pd.DataFrame(index=dates)
df["close"] = price
df["open"] = df["close"].shift(1).fillna(df["close"][0])
# high/low 加入小幅随机波动
df["high"] = df[["open", "close"]].max(axis=1) * (1 + np.random.rand(len(df)) * 0.01)
df["low"] = df[["open", "close"]].min(axis=1) * (1 - np.random.rand(len(df)) * 0.01)
# 随机成交量
np.random.seed(42)
df["volume"] = np.random.randint(100, 1000, size=len(df))


# 定义策略:如果昨天收盘价比前天低则买入,昨天收盘价比前天高则卖出
class BuyOnDownSellOnUp(bt.Strategy):
    def next(self):
        # 确保至少有前一天数据
        if len(self.data.close) < 2:
            return
        today_close = self.data.close[0]
        yesterday_close = self.data.close[-1]

        # 下跌买入
        if today_close < yesterday_close:
            if not self.position:
                self.buy(size=1)
                print(f"{self.data.datetime.date(0)} BUY at {today_close:.2f}")
        # 上涨卖出
        elif today_close > yesterday_close:
            if self.position:
                self.sell(size=1)
                print(f"{self.data.datetime.date(0)} SELL at {today_close:.2f}")


if __name__ == "__main__":
    cerebro = bt.Cerebro()
    # 加载数据
    datafeed = bt.feeds.PandasData(dataname=df)
    cerebro.adddata(datafeed)

    # 添加策略
    cerebro.addstrategy(BuyOnDownSellOnUp)
    # 设置初始资金
    cerebro.broker.setcash(100000.0)

    # 运行回测
    print("Starting Portfolio Value: %.2f" % cerebro.broker.getvalue())
    cerebro.run()
    print("Final Portfolio Value: %.2f" % cerebro.broker.getvalue())

运行能看到所有交易行为以及最后的资产情况

2. 示例代码2

再加入一只股票(也随机),每天开盘买入昨天下跌幅度最大的一支股票(如果都没跌则不买),并且在收盘进行清仓

import pandas as pd
import numpy as np
import backtrader as bt

# 生成一年的随机价格数据(交易日)
dates = pd.date_range(start="2020-01-01", periods=252, freq="B")


# 辅助函数:根据随机种子生成单只股票的OHLCV数据
def make_df(seed):
    np.random.seed(seed)
    price = 100 + np.random.randn(len(dates)).cumsum()
    df = pd.DataFrame(index=dates)
    df["close"] = price
    df["open"] = df["close"].shift(1).fillna(df["close"].iloc[0])
    df["high"] = df[["open", "close"]].max(axis=1) * (
        1 + np.random.rand(len(df)) * 0.01
    )
    df["low"] = df[["open", "close"]].min(axis=1) * (1 - np.random.rand(len(df)) * 0.01)
    df["volume"] = np.random.randint(100, 1000, size=len(df))
    return df


# 生成两只随机股票数据
df1 = make_df(seed=1)
df2 = make_df(seed=2)


# 策略:每天开盘买入昨日跌幅最大的一只(如果有跌幅),收盘时清仓
class LargestDropStrategy(bt.Strategy):
    def next(self):
        # 需要至少两天数据
        if len(self.datas[0].close) < 2:
            return

        # 计算每只股票的昨日跌幅(正值表示下跌幅度)
        drops = []
        for data in self.datas:
            y_close = data.close[-1]
            d2_close = data.close[-2]
            diff = y_close - d2_close
            drops.append(-diff if diff < 0 else 0)

        max_drop = max(drops)
        # 如果有跌幅,则选跌幅最大的一只并在开盘价买入
        if max_drop > 0:
            idx = drops.index(max_drop)
            best_data = self.datas[idx]
            open_price = best_data.open[0]
            if self.getposition(best_data).size == 0:
                self.buy(data=best_data, size=1)
                print(
                    f"{best_data._name} BUY at open {open_price:.2f} on {best_data.datetime.date(0)}"
                )

        # 收盘时清仓
        for data in self.datas:
            pos = self.getposition(data).size
            if pos > 0:
                close_price = data.close[0]
                self.sell(data=data, size=pos)
                print(
                    f"{data._name} SELL at close {close_price:.2f} on {data.datetime.date(0)}"
                )


if __name__ == "__main__":
    cerebro = bt.Cerebro()

    # 将两只股票数据加入回测
    data1 = bt.feeds.PandasData(dataname=df1, name="Stock1")
    data2 = bt.feeds.PandasData(dataname=df2, name="Stock2")
    cerebro.adddata(data1)
    cerebro.adddata(data2)

    cerebro.addstrategy(LargestDropStrategy)
    cerebro.broker.setcash(100000.0)

    print("Starting Portfolio Value: %.2f" % cerebro.broker.getvalue())
    cerebro.run()
    print("Final Portfolio Value: %.2f" % cerebro.broker.getvalue())

3. 示例代码3

载入自己的数据df:

  • 从2012开始的AH股数据

继续用每天开盘买入昨天下跌幅度最大的一支股票(如果都没跌则不买),并且在收盘进行清仓的逻辑

不过需要额外进行数据处理

  • 考虑缺失值:包含还未上市股票和偶尔停牌情况
    • 处理方式向前填入缺失值,同时打上标记。在进行策略时忽略当日的缺失股
import pandas as pd
import backtrader as bt
from datetime import datetime

# ———————————— 0. 读取你的 AH 数据 ————————————
# ahp = pd.read_csv("你的 ahp.csv", dtype={"ts_code_A": str, "ts_code_H": str})
# 这里假设 ahp 是包含 A 股和 H 股调整后 OHLCV 的 DataFrame

# ———————————— 1. 预处理日期列 ————————————
# 将交易日从整数/字符串格式转换为 pandas 的 datetime 类型,方便后续索引和处理
ahp["trade_date"] = pd.to_datetime(
    ahp["trade_date"].astype(str),  # 原始格式如 20191122
    format="%Y%m%d",  # 指定日期格式
)
# 按日期排序,确保数据按时间顺序排列
ahp.sort_values("trade_date", inplace=True)

# 计算全期主交易日历:从最早交易日到最晚交易日,按工作日(Business Day)频率生成日期索引
earliest = ahp["trade_date"].min()
latest = ahp["trade_date"].max()
all_dates = pd.date_range(start=earliest, end=latest, freq="B")  # B 代表工作日


# ———————————— 2. 定义自有数据格式(CustomPandasData) ————————————
# 扩展 bt.feeds.PandasData,新增一个 "listed" 字段,用于标识当日是否有真实交易(未上市/停牌)
class CustomPandasData(bt.feeds.PandasData):
    # 定义新的 line
    lines = ("listed",)
    # params 用于映射 feed 中的列名到 backtrader 内部属性
    params = (
        ("datetime", None),  # 使用 DataFrame 的 index 作为日期时间
        ("open", "open"),  # open 列
        ("high", "high"),  # high 列
        ("low", "low"),  # low 列
        ("close", "close"),  # close 列
        ("volume", "volume"),  # volume 列
        ("openinterest", None),  # 不使用 openinterest
        ("listed", "listed"),  # 新增 listed 列
    )


# ———————————— 3. 策略逻辑:LargestDropStrategy ————————————
# 策略思路:每个 bar 计算所有股票的单日跌幅,择最大跌幅标的买入,收盘清仓
class LargestDropStrategy(bt.Strategy):
    def notify_order(self, order):
        """
        订单状态通知
        order.status:
          - Submitted/Accepted: 已提交或被接受
          - Completed: 已成交
          - Canceled/Margin/Rejected: 取消/保证金不足/拒单
        """
        # 仅在订单完成或失败时打印日志
        if order.status in [order.Submitted, order.Accepted]:
            return  # 尚未决策成交

        if order.status == order.Completed:
            # 成交后打印买卖类型、标的名称、成交价、成交量
            action = "BUY" if order.isbuy() else "SELL"
            print(
                f"{action} EXECUTED: {order.data._name},"
                f" Price: {order.executed.price:.2f},"
                f" Size: {order.executed.size}"
            )
        else:
            # 其他失败情况打印提示
            print(
                f"Order {order.getordername()} on {order.data._name} "
                f"Canceled/Margin/Rejected"
            )

    def notify_trade(self, trade):
        """
        成交明细通知,当一笔交易(开仓+平仓)完全结束后触发
        trade.pnl: 毛利润(不含佣金)
        trade.pnlcomm: 扣除佣金滑点后的净利润
        """
        if trade.isclosed:
            print(
                f"TRADE CLOSED: {trade.data._name},"
                f" Gross PnL: {trade.pnl:.2f},"
                f" Net PnL: {trade.pnlcomm:.2f}"
            )

    def next(self):
        """
        核心交易逻辑:
          1. 跳过数据不足或无效 bar
          2. 计算当日跌幅列表
          3. 选最大跌幅标的买入
          4. 收盘统一平仓
        """
        # 至少需要两根 bar 才能计算昨日收盘价
        if len(self.datas[0].close) < 2:
            return

        drops = []  # 存储每个标的的跌幅
        for data in self.datas:
            # 1) 如果未上市或停牌,则跌幅记为 0 并跳过
            if data.listed[0] == 0 or data.volume[0] == 0:
                drops.append(0)
                continue

            # 2) 获取昨日和今日收盘价
            prev_close = data.close[-2]
            curr_close = data.close[-1]
            # 若价格无效(<=0 或 NaN),同样跳过
            if prev_close <= 0 or curr_close <= 0:
                drops.append(0)
                continue

            # 3) 计算跌幅:只有跌才计负值(方便取最大跌幅)
            diff = curr_close - prev_close
            drops.append(-diff if diff < 0 else 0)

        # 如果存在正的跌幅(即真正下跌),择最大值的标的买入
        max_drop = max(drops)
        if max_drop > 0:
            idx = drops.index(max_drop)
            target = self.datas[idx]
            open_price = target.open[0]
            # 确保开盘价有效且当前未持仓
            if open_price > 0 and self.getposition(target).size == 0:
                print(
                    f"BUY SIGNAL: {target._name} at Open {open_price:.2f} on "
                    f"{target.datetime.date(0)}"
                )
                self.buy(data=target, size=1)

        # 收盘时,对所有持仓进行平仓
        for data in self.datas:
            pos_size = self.getposition(data).size
            if pos_size > 0:
                # 只在有行情(listed==1, vol>0)日平仓也可按需添加判断
                close_price = data.close[0]
                print(
                    f"SELL SIGNAL: {data._name} at Close {close_price:.2f} on "
                    f"{data.datetime.date(0)}"
                )
                self.sell(data=data, size=pos_size)


# ———————————— 4. 主函数:喂入数据、运行回测 ————————————
if __name__ == "__main__":
    # 1) 初始化 Cerebro 引擎
    cerebro = bt.Cerebro()
    # 2) 设置初始资金
    cerebro.broker.setcash(1_000_000.0)

    # —— 4.1 导入 A 股数据 ——
    for code in ahp["ts_code_A"].unique():
        df = ahp[ahp["ts_code_A"] == code].copy()
        df.set_index("trade_date", inplace=True)

        # 重命名列并选取需要的 OHLCV
        df_feed = (
            df[
                ["open_adj_A", "high_adj_A", "low_adj_A", "close_adj_A", "vol_A"]
            ].rename(
                columns={
                    "open_adj_A": "open",
                    "high_adj_A": "high",
                    "low_adj_A": "low",
                    "close_adj_A": "close",
                    "vol_A": "volume",
                }
            )
            # 按主交易日历补全所有日期
            .reindex(all_dates)
        )
        # listed: 真实交易日为 1,停牌/未上市为 0
        df_feed["listed"] = df_feed["close"].notna().astype(int)
        # 对 OHLC 做前向填充,停牌日价格维持前一有效价,上市前置 0
        df_feed[["open", "high", "low", "close"]] = (
            df_feed[["open", "high", "low", "close"]].ffill().fillna(0.0)
        )
        # volume 缺失或停牌置 0
        df_feed["volume"] = df_feed["volume"].fillna(0)

        # 获取标的名称(或 ts_code)作为数据源名称
        name = df.get("name", pd.Series([code])).iat[0]
        data_feed = CustomPandasData(dataname=df_feed, name=f"{name}_A")
        cerebro.adddata(data_feed)

    # —— 4.2 导入 H 股数据 ——
    for code in ahp["ts_code_H"].unique():
        df = ahp[ahp["ts_code_H"] == code].copy()
        df.set_index("trade_date", inplace=True)

        df_feed = (
            df[["open_adj_H", "high_adj_H", "low_adj_H", "close_adj_H", "vol_H"]]
            .rename(
                columns={
                    "open_adj_H": "open",
                    "high_adj_H": "high",
                    "low_adj_H": "low",
                    "close_adj_H": "close",
                    "vol_H": "volume",
                }
            )
            .reindex(all_dates)
        )
        df_feed["listed"] = df_feed["close"].notna().astype(int)
        df_feed[["open", "high", "low", "close"]] = (
            df_feed[["open", "high", "low", "close"]].ffill().fillna(0.0)
        )
        df_feed["volume"] = df_feed["volume"].fillna(0)

        name = df.get("name", pd.Series([code])).iat[0]
        data_feed = CustomPandasData(dataname=df_feed, name=f"{name}_H")
        cerebro.adddata(data_feed)

    # 5) 添加策略并运行
    cerebro.addstrategy(LargestDropStrategy)
    print(f"Starting Portfolio Value: {cerebro.broker.getvalue():.2f}")
    cerebro.run()
    print(f"Final Portfolio Value:   {cerebro.broker.getvalue():.2f}")

运行情况:

发表评论