# 缠论中枢绘制 - 技术分享

Author: abc_quant, Created: 2020-10-15 17:09:00, Updated:

```'''
start: 2020-10-1 00:00:00
end: 2020-10-15 16:00:00
period: 1h
basePeriod: 1h
exchanges: [{"eid":"OKEX","currency":"BTC_USDT","stocks":1}]
'''
import pandas as pd
from fmz import * # 导入所有FMZ函数
#!pip install --user mplfinance
#import sys
#sys.path.append('/home/quant/.local/lib/python3.6/site-packages')
#import mplfinance```
```# 第三方函数库
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
#import mplfinance as mpf
import matplotlib.patches as patches
import talib
import datetime
import warnings
warnings.filterwarnings("ignore")

def get_k_series():
#
# 获取k线序列，默认为30分钟级别
# 输入：n是级别，单位是分钟
# 输出：pandas, k线序列
n = 1

one_min_data = pd.DataFrame(exchange.GetRecords())
one_min_data = one_min_data.rename(columns={'Time':'date','Open':'open','Close':'close','High':'high','Low':'low'})
one_min_data['date'] = one_min_data['date'].apply(lambda x:_D(x/1000))
one_min_data['Date'] = one_min_data['date'].apply(lambda x:pd.to_datetime(x))
one_min_data.set_index('Date',inplace=True)

#print(one_min_data)
n_min_data = pd.DataFrame()
for i in range(n, len(one_min_data) + 1, n):

interval = one_min_data.iloc[i - n:i]
interval_open = interval.open.iloc[0]
interval_high = max(interval.high)
interval_low = min(interval.low)
interval_date = interval.date
interval_k = pd.DataFrame(interval[-1:])  # 新建DataFrame，否则会报SettingWithCopyWarning
interval_k.open = interval_open
interval_k.high = interval_high
interval_k.low = interval_low
interval_k.date = interval_date
#print(interval_k)
n_min_data = pd.concat([n_min_data, interval_k], axis=0)
n_min_data = n_min_data.reset_index()
#del n_min_data['instrument']
#del n_min_data['index']
#print(n_min_data)
return n_min_data

def get_binary_positions(k_data):
#
# 计算k线序列的二分位值
# 输入：k线序列
# 输出：list, k线序列对应的二分位值
binary_positions = []
for i in range(len(k_data)):
temp_y = (k_data.high[i] + k_data.low[i]) / 2.0
binary_positions.append(temp_y)
return binary_positions

#
# 判断k线的包含关系，便于寻找顶分型和底分型
# 输入：k线序列
trend = [0]
temp_data = k_data[:1]
#print(temp_data)
#return
for i in range(len(k_data)):
#print("处理：",i)
is_equal = temp_data.high.iloc[-1] == k_data.high.iloc[i] and temp_data.low.iloc[-1] == k_data.low.iloc[i]  # 第1根等于第2根

# 向右包含
if temp_data.high.iloc[-1] >= k_data.high.iloc[i] and temp_data.low.iloc[-1] <= k_data.low.iloc[i] and not is_equal:
if trend[-1] == -1:
temp_data.high.iloc[-1] = k_data.high.iloc[i]
else:
temp_data.low.iloc[-1] = k_data.low.iloc[i]

# 向左包含
elif temp_data.high.iloc[-1] <= k_data.high.iloc[i] and temp_data.low.iloc[-1] >= k_data.low.iloc[i] and not is_equal:
if trend[-1] == -1:
temp_data.low.iloc[-1] = k_data.low.iloc[i]
else:
temp_data.high.iloc[-1] = k_data.high.iloc[i]

elif is_equal:
trend.append(0)

elif temp_data.high.iloc[-1] > k_data.high.iloc[i] and temp_data.low.iloc[-1] > k_data.low.iloc[i]:
trend.append(-1)
temp_data = k_data[i:i + 1]

elif temp_data.high.iloc[-1] < k_data.high.iloc[i] and temp_data.low.iloc[-1] < k_data.low.iloc[i]:
trend.append(1)
temp_data = k_data[i:i + 1]

#print("处理判断完毕：",i)

#print("调整收盘价和开盘价：",i)
# 调整收盘价和开盘价
if temp_data.open.iloc[-1] > temp_data.close.iloc[-1]:
if temp_data.open.iloc[-1] > temp_data.high.iloc[-1]:
temp_data.open.iloc[-1] = temp_data.high.iloc[-1]
if temp_data.close.iloc[-1] < temp_data.low.iloc[-1]:
temp_data.close.iloc[-1] = temp_data.low.iloc[-1]
else:
if temp_data.open.iloc[-1] < temp_data.low.iloc[-1]:
temp_data.open.iloc[-1] = temp_data.low.iloc[-1]
if temp_data.close.iloc[-1] > temp_data.high.iloc[-1]:
temp_data.close.iloc[-1] = temp_data.high.iloc[-1]

#print("调整收盘价和开盘价完毕：",i)

#
# 寻找顶分型和底分型
# 1）连续分型选择最极端值
# 2）分型之间保证3根k线
# 输入：调整后的k线序列
# 输出：顶分型和底分型的位置

temp_num = 0  # 上一个顶或底的位置
temp_high = 0  # 上一个顶的high值
temp_low = 0  # 上一个底的low值
temp_type = 0  # 上一个记录位置的类型

fx_type = []  # 记录分型点的类型，1为顶分型，-1为底分型
fx_time = []  # 记录分型点的时间
fx_plot = []  # 记录点的数值，为顶分型取high值，为底分型取low值
fx_data = pd.DataFrame()  # 记录分型
fx_offset = []

# 加上线段起点
fx_type.append(0)
fx_offset.append(0)

i = 1
while (i < len(adjusted_k_data) - 1):

if top:
if temp_type == 1:
# 如果上一个分型为顶分型，则进行比较，选取高点更高的分型
i += 1
else:
temp_num = i
temp_type = 1
i += 2  # 两个分型之间至少有3根k线
elif temp_type == -1:
# 如果上一个分型为底分型，则记录上一个分型，用当前分型与后面的分型比较，选取同向更极端的分型
# 如果上一个底分型的底比当前顶分型的顶高，则跳过当前顶分型。
i += 1
else:
fx_type.append(-1)
fx_data = pd.concat([fx_data, adjusted_k_data[temp_num:temp_num + 1]], axis=0)
fx_plot.append(temp_low)
fx_offset.append(temp_num)
temp_num = i
temp_type = 1
i += 2  # 两个分型之间至少有3根k线
else:
temp_num = i
temp_type = 1
i += 2

elif bottom:
if temp_type == -1:
# 如果上一个分型为底分型，则进行比较，选取低点更低的分型
i += 1
else:
temp_num = i
temp_type = -1
i += 2
elif temp_type == 1:
# 如果上一个分型为顶分型，则记录上一个分型，用当前分型与后面的分型比较，选取同向更极端的分型
# 如果上一个顶分型的底比当前底分型的底低，则跳过当前底分型。
i += 1
else:
fx_type.append(1)
fx_data = pd.concat([fx_data, adjusted_k_data[temp_num:temp_num + 1]], axis=0)
fx_plot.append(temp_high)
fx_offset.append(temp_num)
temp_num = i
temp_type = -1
i += 2
else:
temp_num = i
temp_type = -1
i += 2
else:
i += 1

# 加上最后一个分型（上面的循环中最后的一个分型并未处理）
if temp_type == -1:
fx_type.append(-1)
fx_data = pd.concat([fx_data, adjusted_k_data[temp_num:temp_num + 1]], axis=0)
fx_plot.append(temp_low)
fx_offset.append(temp_num)
elif temp_type == 1:
fx_type.append(1)
fx_data = pd.concat([fx_data, adjusted_k_data[temp_num:temp_num + 1]], axis=0)
fx_plot.append(temp_high)
fx_offset.append(temp_num)

# 加上线段终点
fx_type.append(0)

return fx_type, fx_time, fx_data, fx_plot, fx_offset

def get_pivot(fx_plot, fx_offset, fx_observe):
#
# 计算最近的中枢
# 注意：一个中枢至少有三笔
# fx_plot 笔的节点股价
# fx_offset 笔的节点时间点（偏移）
# fx_observe 所观测的分型点

if fx_observe < 1:
# 处理边界
right_bound = 0
left_bount = 0
min_high = 0
max_low = 0
pivot_x_interval = [left_bount, right_bound]
pivot_price_interval = [max_low, min_high]
return pivot_x_interval, pivot_price_interval

right_bound = (fx_offset[fx_observe] + fx_offset[fx_observe - 1]) / 2
# 右边界是所观察分型的上一笔中位
left_bount = 0
min_high = 0
max_low = 0

if fx_plot[fx_observe] >= fx_plot[fx_observe - 1]:
# 所观察分型的上一笔是往上的一笔
min_high = fx_plot[fx_observe]
max_low = fx_plot[fx_observe - 1]
else:  # 所观察分型的上一笔是往下的一笔
max_low = fx_plot[fx_observe]
min_high = fx_plot[fx_observe - 1]

i = fx_observe - 1
cover = 0  # 记录走势的重叠区，至少为3才能画中枢
while (i >= 1):
if fx_plot[i] >= fx_plot[i - 1]:
# 往上的一笔
if fx_plot[i] < max_low or fx_plot[i - 1] > min_high:
# 已经没有重叠区域了
left_bount = (fx_offset[i] + fx_offset[i + 1]) / 2
break
else:
# 有重叠区域
# 计算更窄的中枢价格区间
cover += 1
min_high = min(fx_plot[i], min_high)
max_low = max(fx_plot[i - 1], max_low)

elif fx_plot[i] < fx_plot[i - 1]:
# 往下的一笔
if fx_plot[i] > min_high or fx_plot[i - 1] < max_low:
# 已经没有重叠区域了
left_bount = (fx_offset[i] + fx_offset[i + 1]) / 2
break
else:
# 有重叠区域
# 计算更窄的中枢价格区间
cover += 3
min_high = min(fx_plot[i - 1], min_high)
max_low = max(fx_plot[i], max_low)

i -= 1

if cover < 3:
# 不满足中枢定义
right_bound = -1
left_bount = -1
min_high = -1
max_low = -1

pivot_x_interval = [left_bount, right_bound]
pivot_price_interval = [max_low, min_high]
return pivot_x_interval, pivot_price_interval,i

def plot_k_series(ax,k_data):
# 画k线
num_of_ticks = len(k_data)

# fig, ax = plt.subplots(figsize=(num_of_ticks, 20))
dates = k_data.date
# print dates
ax.set_xticks(np.linspace(1, num_of_ticks, num_of_ticks))
ax.set_xticklabels(list(dates))
"""
xticks = list(range(0, len(dates), 10))  # 这里设置的是x轴点的位置（40设置的就是间隔了）
xlabels = [dates[x] for x in xticks ]  # 这里设置X轴上的点对应在数据集中的值（这里用的数据为totalSeed）
xticks.append(len(dates))
xlabels.append(dates[-1])
ax.set_xticks(xticks)
ax.set_xticklabels(xlabels, rotation=40)
"""
#T.plot(k_data,candlestick=True)
print("绘制K线")
plt.plot(k_data.close)
#print(1)
#     mpf.candlestick2_ochl(
#         ax,
#         list(k_data.open), list(k_data.close), list(k_data.high), list(k_data.low),
#         width=0.6, colorup='r', colordown='b', alpha=0.75
#     )
#mpf.plot(k_data,type='candle')

plt.grid(True)
plt.setp(plt.gca().get_xticklabels(), rotation=30)
return dates

def plot_lines(ax, fx_plot, fx_offset):
# 绘制笔和线段
# ax 绘图区域
# fx_plot

plt.plot(fx_offset, fx_plot, 'k', lw=1)
plt.plot(fx_offset, fx_plot, 'o')

def plot_pivot(ax, pivot_date_interval, pivot_price_interval):
#
# 绘制中枢

start_point = (pivot_date_interval[0], pivot_price_interval[0])
width = pivot_date_interval[1] - pivot_date_interval[0]
height = pivot_price_interval[1] - pivot_price_interval[0]
print(
"中枢：",
start_point,  # (x,y)
width,  # width
height,  # height
)
patches.Rectangle(
start_point,  # (x,y)
width,  # width
height,  # height
linewidth=8,
edgecolor='g',
facecolor='none'
)
)
return

def plot_all(select_deta=10,price_percent=0.01):#select_deta 中枢最小间隔  price_percent幅度小于某个值
k_series = get_k_series()
kk=k_series
if(len(kk)<10):
print('k线数量不足')
return
fig = plt.figure(figsize=(50, 20))
plt.rcParams.update({'figure.max_open_warning': 0})

print("处理K线...")

fx_type, fx_time, fx_data, fx_plot, fx_offset = get_fx(adjusted_k_data)
print (fx_type, fx_time)
plot_lines(ax2, fx_plot, fx_offset)
pivot_x_interval, pivot_price_interval,now_index = None,None,len(fx_offset)-2
#last_pivot_x_interval, last_pivot_price_interval,last_index = get_pivot(fx_plot, fx_offset, now_index)

while now_index >= 1:
pivot_x_interval, pivot_price_interval,now_index = get_pivot(fx_plot, fx_offset, now_index)
if pivot_x_interval[0] == -1:
break
else:
if pivot_x_interval[1] - pivot_x_interval[0] < select_deta:
print("pivot_x_interval[1] - pivot_x_interval[0] < select_deta")
continue
hhv = max(k_series.high[int(pivot_x_interval[0]):int(pivot_x_interval[1])])
llv = min(k_series.low[int(pivot_x_interval[0]):int(pivot_x_interval[1])])
hhv_deta = abs((hhv - pivot_price_interval[1]) / pivot_price_interval[1])
llv_deta = abs((llv - pivot_price_interval[0]) / pivot_price_interval[0])
if (hhv_deta > price_percent or llv_deta > price_percent):
print(" (hhv_deta > 1.0 / price_percent or llv_deta > 1.0 / price_percent)")
continue
plot_pivot(ax2, pivot_x_interval, pivot_price_interval)

plot_all(
select_deta = 2,#中枢最小间隔
price_percent = 0.5#幅度小于某个值
)```
```处理K线...

[0, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 0] ['2020-09-22 16:00:00', '2020-09-22 17:00:00', '2020-09-23 01:00:00', '2020-09-23 07:00:00', '2020-09-23 13:00:00', '2020-09-23 15:00:00', '2020-09-23 17:00:00', '2020-09-23 21:00:00', '2020-09-24 03:00:00', '2020-09-24 05:00:00', '2020-09-24 17:00:00', '2020-09-24 19:00:00', '2020-09-24 23:00:00', '2020-09-25 10:00:00', '2020-09-25 20:00:00', '2020-09-26 01:00:00', '2020-09-26 06:00:00', '2020-09-26 10:00:00', '2020-09-26 18:00:00', '2020-09-26 22:00:00', '2020-09-27 01:00:00', '2020-09-27 12:00:00', '2020-09-27 16:00:00', '2020-09-27 18:00:00', '2020-09-28 02:00:00', '2020-09-28 09:00:00', '2020-09-28 13:00:00', '2020-09-28 15:00:00', '2020-09-28 17:00:00', '2020-09-28 19:00:00', '2020-09-28 21:00:00', '2020-09-29 02:00:00', '2020-09-29 04:00:00', '2020-09-29 06:00:00', '2020-09-29 11:00:00', '2020-09-29 17:00:00', '2020-09-30 00:00:00', '2020-09-30 09:00:00', '2020-09-30 17:00:00', '2020-09-30 20:00:00', '2020-10-01 00:00:00']

<Figure size 3600x1440 with 1 Axes>```

More

wildfire 另外这中枢看着好像不对啊，前3个线段的高点低，地点高，好像图上显示的不是这样啊

wildfire 正在看缠论，请教一下，中枢都画出来了，有没有形成策略？策略收益如何？

homily 感觉不错，不过代码有点乱，没有看懂计算笔和和中枢的方法，我要仔细学习一下。另外，请问会不会有未来函数的可能。

wildfire 赞！

abc_quant 中枢价格范围是最低的最高价，以及最高的最低价

abc_quant 目前做研究中，还未做策略

abc_quant 根据历史行情计算的中枢，不存在未来函数