1. 示例代码1
backtrader的示例代码,如果标的价格昨天降了则买,昨天涨了则卖,标的价格随机生成,模拟一年的数据
import pandas as pd
import numpy as np
import backtrader as bt
# 生成一年的随机价格数据(交易日)
dates = pd.date_range(start="2020-01-01", periods=252, freq="B") # 252 个交易日
# 随机价格生成:起始价 100,然后累积正态分布波动
price = 100 + np.random.randn(len(dates)).cumsum()
# 构造 DataFrame
df = pd.DataFrame(index=dates)
df["close"] = price
df["open"] = df["close"].shift(1).fillna(df["close"][0])
# high/low 加入小幅随机波动
df["high"] = df[["open", "close"]].max(axis=1) * (1 + np.random.rand(len(df)) * 0.01)
df["low"] = df[["open", "close"]].min(axis=1) * (1 - np.random.rand(len(df)) * 0.01)
# 随机成交量
np.random.seed(42)
df["volume"] = np.random.randint(100, 1000, size=len(df))
# 定义策略:如果昨天收盘价比前天低则买入,昨天收盘价比前天高则卖出
class BuyOnDownSellOnUp(bt.Strategy):
def next(self):
# 确保至少有前一天数据
if len(self.data.close) < 2:
return
today_close = self.data.close[0]
yesterday_close = self.data.close[-1]
# 下跌买入
if today_close < yesterday_close:
if not self.position:
self.buy(size=1)
print(f"{self.data.datetime.date(0)} BUY at {today_close:.2f}")
# 上涨卖出
elif today_close > yesterday_close:
if self.position:
self.sell(size=1)
print(f"{self.data.datetime.date(0)} SELL at {today_close:.2f}")
if __name__ == "__main__":
cerebro = bt.Cerebro()
# 加载数据
datafeed = bt.feeds.PandasData(dataname=df)
cerebro.adddata(datafeed)
# 添加策略
cerebro.addstrategy(BuyOnDownSellOnUp)
# 设置初始资金
cerebro.broker.setcash(100000.0)
# 运行回测
print("Starting Portfolio Value: %.2f" % cerebro.broker.getvalue())
cerebro.run()
print("Final Portfolio Value: %.2f" % cerebro.broker.getvalue())
运行能看到所有交易行为以及最后的资产情况

2. 示例代码2
再加入一只股票(也随机),每天开盘买入昨天下跌幅度最大的一支股票(如果都没跌则不买),并且在收盘进行清仓
import pandas as pd
import numpy as np
import backtrader as bt
# 生成一年的随机价格数据(交易日)
dates = pd.date_range(start="2020-01-01", periods=252, freq="B")
# 辅助函数:根据随机种子生成单只股票的OHLCV数据
def make_df(seed):
np.random.seed(seed)
price = 100 + np.random.randn(len(dates)).cumsum()
df = pd.DataFrame(index=dates)
df["close"] = price
df["open"] = df["close"].shift(1).fillna(df["close"].iloc[0])
df["high"] = df[["open", "close"]].max(axis=1) * (
1 + np.random.rand(len(df)) * 0.01
)
df["low"] = df[["open", "close"]].min(axis=1) * (1 - np.random.rand(len(df)) * 0.01)
df["volume"] = np.random.randint(100, 1000, size=len(df))
return df
# 生成两只随机股票数据
df1 = make_df(seed=1)
df2 = make_df(seed=2)
# 策略:每天开盘买入昨日跌幅最大的一只(如果有跌幅),收盘时清仓
class LargestDropStrategy(bt.Strategy):
def next(self):
# 需要至少两天数据
if len(self.datas[0].close) < 2:
return
# 计算每只股票的昨日跌幅(正值表示下跌幅度)
drops = []
for data in self.datas:
y_close = data.close[-1]
d2_close = data.close[-2]
diff = y_close - d2_close
drops.append(-diff if diff < 0 else 0)
max_drop = max(drops)
# 如果有跌幅,则选跌幅最大的一只并在开盘价买入
if max_drop > 0:
idx = drops.index(max_drop)
best_data = self.datas[idx]
open_price = best_data.open[0]
if self.getposition(best_data).size == 0:
self.buy(data=best_data, size=1)
print(
f"{best_data._name} BUY at open {open_price:.2f} on {best_data.datetime.date(0)}"
)
# 收盘时清仓
for data in self.datas:
pos = self.getposition(data).size
if pos > 0:
close_price = data.close[0]
self.sell(data=data, size=pos)
print(
f"{data._name} SELL at close {close_price:.2f} on {data.datetime.date(0)}"
)
if __name__ == "__main__":
cerebro = bt.Cerebro()
# 将两只股票数据加入回测
data1 = bt.feeds.PandasData(dataname=df1, name="Stock1")
data2 = bt.feeds.PandasData(dataname=df2, name="Stock2")
cerebro.adddata(data1)
cerebro.adddata(data2)
cerebro.addstrategy(LargestDropStrategy)
cerebro.broker.setcash(100000.0)
print("Starting Portfolio Value: %.2f" % cerebro.broker.getvalue())
cerebro.run()
print("Final Portfolio Value: %.2f" % cerebro.broker.getvalue())

3. 示例代码3
载入自己的数据df:

- 从2012开始的AH股数据
继续用每天开盘买入昨天下跌幅度最大的一支股票(如果都没跌则不买),并且在收盘进行清仓的逻辑
不过需要额外进行数据处理
- 考虑缺失值:包含还未上市股票和偶尔停牌情况
- 处理方式向前填入缺失值,同时打上标记。在进行策略时忽略当日的缺失股
import pandas as pd
import backtrader as bt
from datetime import datetime
# ———————————— 0. 读取你的 AH 数据 ————————————
# ahp = pd.read_csv("你的 ahp.csv", dtype={"ts_code_A": str, "ts_code_H": str})
# 这里假设 ahp 是包含 A 股和 H 股调整后 OHLCV 的 DataFrame
# ———————————— 1. 预处理日期列 ————————————
# 将交易日从整数/字符串格式转换为 pandas 的 datetime 类型,方便后续索引和处理
ahp["trade_date"] = pd.to_datetime(
ahp["trade_date"].astype(str), # 原始格式如 20191122
format="%Y%m%d", # 指定日期格式
)
# 按日期排序,确保数据按时间顺序排列
ahp.sort_values("trade_date", inplace=True)
# 计算全期主交易日历:从最早交易日到最晚交易日,按工作日(Business Day)频率生成日期索引
earliest = ahp["trade_date"].min()
latest = ahp["trade_date"].max()
all_dates = pd.date_range(start=earliest, end=latest, freq="B") # B 代表工作日
# ———————————— 2. 定义自有数据格式(CustomPandasData) ————————————
# 扩展 bt.feeds.PandasData,新增一个 "listed" 字段,用于标识当日是否有真实交易(未上市/停牌)
class CustomPandasData(bt.feeds.PandasData):
# 定义新的 line
lines = ("listed",)
# params 用于映射 feed 中的列名到 backtrader 内部属性
params = (
("datetime", None), # 使用 DataFrame 的 index 作为日期时间
("open", "open"), # open 列
("high", "high"), # high 列
("low", "low"), # low 列
("close", "close"), # close 列
("volume", "volume"), # volume 列
("openinterest", None), # 不使用 openinterest
("listed", "listed"), # 新增 listed 列
)
# ———————————— 3. 策略逻辑:LargestDropStrategy ————————————
# 策略思路:每个 bar 计算所有股票的单日跌幅,择最大跌幅标的买入,收盘清仓
class LargestDropStrategy(bt.Strategy):
def notify_order(self, order):
"""
订单状态通知
order.status:
- Submitted/Accepted: 已提交或被接受
- Completed: 已成交
- Canceled/Margin/Rejected: 取消/保证金不足/拒单
"""
# 仅在订单完成或失败时打印日志
if order.status in [order.Submitted, order.Accepted]:
return # 尚未决策成交
if order.status == order.Completed:
# 成交后打印买卖类型、标的名称、成交价、成交量
action = "BUY" if order.isbuy() else "SELL"
print(
f"{action} EXECUTED: {order.data._name},"
f" Price: {order.executed.price:.2f},"
f" Size: {order.executed.size}"
)
else:
# 其他失败情况打印提示
print(
f"Order {order.getordername()} on {order.data._name} "
f"Canceled/Margin/Rejected"
)
def notify_trade(self, trade):
"""
成交明细通知,当一笔交易(开仓+平仓)完全结束后触发
trade.pnl: 毛利润(不含佣金)
trade.pnlcomm: 扣除佣金滑点后的净利润
"""
if trade.isclosed:
print(
f"TRADE CLOSED: {trade.data._name},"
f" Gross PnL: {trade.pnl:.2f},"
f" Net PnL: {trade.pnlcomm:.2f}"
)
def next(self):
"""
核心交易逻辑:
1. 跳过数据不足或无效 bar
2. 计算当日跌幅列表
3. 选最大跌幅标的买入
4. 收盘统一平仓
"""
# 至少需要两根 bar 才能计算昨日收盘价
if len(self.datas[0].close) < 2:
return
drops = [] # 存储每个标的的跌幅
for data in self.datas:
# 1) 如果未上市或停牌,则跌幅记为 0 并跳过
if data.listed[0] == 0 or data.volume[0] == 0:
drops.append(0)
continue
# 2) 获取昨日和今日收盘价
prev_close = data.close[-2]
curr_close = data.close[-1]
# 若价格无效(<=0 或 NaN),同样跳过
if prev_close <= 0 or curr_close <= 0:
drops.append(0)
continue
# 3) 计算跌幅:只有跌才计负值(方便取最大跌幅)
diff = curr_close - prev_close
drops.append(-diff if diff < 0 else 0)
# 如果存在正的跌幅(即真正下跌),择最大值的标的买入
max_drop = max(drops)
if max_drop > 0:
idx = drops.index(max_drop)
target = self.datas[idx]
open_price = target.open[0]
# 确保开盘价有效且当前未持仓
if open_price > 0 and self.getposition(target).size == 0:
print(
f"BUY SIGNAL: {target._name} at Open {open_price:.2f} on "
f"{target.datetime.date(0)}"
)
self.buy(data=target, size=1)
# 收盘时,对所有持仓进行平仓
for data in self.datas:
pos_size = self.getposition(data).size
if pos_size > 0:
# 只在有行情(listed==1, vol>0)日平仓也可按需添加判断
close_price = data.close[0]
print(
f"SELL SIGNAL: {data._name} at Close {close_price:.2f} on "
f"{data.datetime.date(0)}"
)
self.sell(data=data, size=pos_size)
# ———————————— 4. 主函数:喂入数据、运行回测 ————————————
if __name__ == "__main__":
# 1) 初始化 Cerebro 引擎
cerebro = bt.Cerebro()
# 2) 设置初始资金
cerebro.broker.setcash(1_000_000.0)
# —— 4.1 导入 A 股数据 ——
for code in ahp["ts_code_A"].unique():
df = ahp[ahp["ts_code_A"] == code].copy()
df.set_index("trade_date", inplace=True)
# 重命名列并选取需要的 OHLCV
df_feed = (
df[
["open_adj_A", "high_adj_A", "low_adj_A", "close_adj_A", "vol_A"]
].rename(
columns={
"open_adj_A": "open",
"high_adj_A": "high",
"low_adj_A": "low",
"close_adj_A": "close",
"vol_A": "volume",
}
)
# 按主交易日历补全所有日期
.reindex(all_dates)
)
# listed: 真实交易日为 1,停牌/未上市为 0
df_feed["listed"] = df_feed["close"].notna().astype(int)
# 对 OHLC 做前向填充,停牌日价格维持前一有效价,上市前置 0
df_feed[["open", "high", "low", "close"]] = (
df_feed[["open", "high", "low", "close"]].ffill().fillna(0.0)
)
# volume 缺失或停牌置 0
df_feed["volume"] = df_feed["volume"].fillna(0)
# 获取标的名称(或 ts_code)作为数据源名称
name = df.get("name", pd.Series([code])).iat[0]
data_feed = CustomPandasData(dataname=df_feed, name=f"{name}_A")
cerebro.adddata(data_feed)
# —— 4.2 导入 H 股数据 ——
for code in ahp["ts_code_H"].unique():
df = ahp[ahp["ts_code_H"] == code].copy()
df.set_index("trade_date", inplace=True)
df_feed = (
df[["open_adj_H", "high_adj_H", "low_adj_H", "close_adj_H", "vol_H"]]
.rename(
columns={
"open_adj_H": "open",
"high_adj_H": "high",
"low_adj_H": "low",
"close_adj_H": "close",
"vol_H": "volume",
}
)
.reindex(all_dates)
)
df_feed["listed"] = df_feed["close"].notna().astype(int)
df_feed[["open", "high", "low", "close"]] = (
df_feed[["open", "high", "low", "close"]].ffill().fillna(0.0)
)
df_feed["volume"] = df_feed["volume"].fillna(0)
name = df.get("name", pd.Series([code])).iat[0]
data_feed = CustomPandasData(dataname=df_feed, name=f"{name}_H")
cerebro.adddata(data_feed)
# 5) 添加策略并运行
cerebro.addstrategy(LargestDropStrategy)
print(f"Starting Portfolio Value: {cerebro.broker.getvalue():.2f}")
cerebro.run()
print(f"Final Portfolio Value: {cerebro.broker.getvalue():.2f}")
运行情况:
