1. 示例代码1
backtrader的示例代码,如果标的价格昨天降了则买,昨天涨了则卖,标的价格随机生成,模拟一年的数据
import pandas as pd
import numpy as np
import backtrader as bt
# 生成一年的随机价格数据(交易日)
dates = pd.date_range(start="2020-01-01", periods=252, freq="B") # 252 个交易日
# 随机价格生成:起始价 100,然后累积正态分布波动
price = 100 + np.random.randn(len(dates)).cumsum()
# 构造 DataFrame
df = pd.DataFrame(index=dates)
df["close"] = price
df["open"] = df["close"].shift(1).fillna(df["close"][0])
# high/low 加入小幅随机波动
df["high"] = df[["open", "close"]].max(axis=1) * (1 + np.random.rand(len(df)) * 0.01)
df["low"] = df[["open", "close"]].min(axis=1) * (1 - np.random.rand(len(df)) * 0.01)
# 随机成交量
np.random.seed(42)
df["volume"] = np.random.randint(100, 1000, size=len(df))
# 定义策略:如果昨天收盘价比前天低则买入,昨天收盘价比前天高则卖出
class BuyOnDownSellOnUp(bt.Strategy):
def next(self):
# 确保至少有前一天数据
if len(self.data.close) < 2:
return
today_close = self.data.close[0]
yesterday_close = self.data.close[-1]
# 下跌买入
if today_close < yesterday_close:
if not self.position:
self.buy(size=1)
print(f"{self.data.datetime.date(0)} BUY at {today_close:.2f}")
# 上涨卖出
elif today_close > yesterday_close:
if self.position:
self.sell(size=1)
print(f"{self.data.datetime.date(0)} SELL at {today_close:.2f}")
if __name__ == "__main__":
cerebro = bt.Cerebro()
# 加载数据
datafeed = bt.feeds.PandasData(dataname=df)
cerebro.adddata(datafeed)
# 添加策略
cerebro.addstrategy(BuyOnDownSellOnUp)
# 设置初始资金
cerebro.broker.setcash(100000.0)
# 运行回测
print("Starting Portfolio Value: %.2f" % cerebro.broker.getvalue())
cerebro.run()
print("Final Portfolio Value: %.2f" % cerebro.broker.getvalue())
运行能看到所有交易行为以及最后的资产情况

2. 示例代码2
再加入一只股票(也随机),每天开盘买入昨天下跌幅度最大的一支股票(如果都没跌则不买),并且在收盘进行清仓
import pandas as pd
import numpy as np
import backtrader as bt
# 生成一年的随机价格数据(交易日)
dates = pd.date_range(start="2020-01-01", periods=252, freq="B")
# 辅助函数:根据随机种子生成单只股票的OHLCV数据
def make_df(seed):
np.random.seed(seed)
price = 100 + np.random.randn(len(dates)).cumsum()
df = pd.DataFrame(index=dates)
df["close"] = price
df["open"] = df["close"].shift(1).fillna(df["close"].iloc[0])
df["high"] = df[["open", "close"]].max(axis=1) * (
1 + np.random.rand(len(df)) * 0.01
)
df["low"] = df[["open", "close"]].min(axis=1) * (1 - np.random.rand(len(df)) * 0.01)
df["volume"] = np.random.randint(100, 1000, size=len(df))
return df
# 生成两只随机股票数据
df1 = make_df(seed=1)
df2 = make_df(seed=2)
# 策略:每天开盘买入昨日跌幅最大的一只(如果有跌幅),收盘时清仓
class LargestDropStrategy(bt.Strategy):
def next(self):
# 需要至少两天数据
if len(self.datas[0].close) < 2:
return
# 计算每只股票的昨日跌幅(正值表示下跌幅度)
drops = []
for data in self.datas:
y_close = data.close[-1]
d2_close = data.close[-2]
diff = y_close - d2_close
drops.append(-diff if diff < 0 else 0)
max_drop = max(drops)
# 如果有跌幅,则选跌幅最大的一只并在开盘价买入
if max_drop > 0:
idx = drops.index(max_drop)
best_data = self.datas[idx]
open_price = best_data.open[0]
if self.getposition(best_data).size == 0:
self.buy(data=best_data, size=1)
print(
f"{best_data._name} BUY at open {open_price:.2f} on {best_data.datetime.date(0)}"
)
# 收盘时清仓
for data in self.datas:
pos = self.getposition(data).size
if pos > 0:
close_price = data.close[0]
self.sell(data=data, size=pos)
print(
f"{data._name} SELL at close {close_price:.2f} on {data.datetime.date(0)}"
)
if __name__ == "__main__":
cerebro = bt.Cerebro()
# 将两只股票数据加入回测
data1 = bt.feeds.PandasData(dataname=df1, name="Stock1")
data2 = bt.feeds.PandasData(dataname=df2, name="Stock2")
cerebro.adddata(data1)
cerebro.adddata(data2)
cerebro.addstrategy(LargestDropStrategy)
cerebro.broker.setcash(100000.0)
print("Starting Portfolio Value: %.2f" % cerebro.broker.getvalue())
cerebro.run()
print("Final Portfolio Value: %.2f" % cerebro.broker.getvalue())
