2
ফোকাস
35
অনুসারী

মেশিন লার্নিং: মূল্য প্রবণতা পূর্বাভাস (1)

তৈরি: 2025-07-28 18:53:08, আপডেট করা হয়েছে: 2025-07-29 08:41:50
comments   3
hits   723

মেশিন লার্নিং: মূল্য প্রবণতা পূর্বাভাস (1) পরিপক্ক শ্রেণিবিন্যাসকারী: সাপোর্ট ভেক্টর মেশিন (SVM) হল একটি শক্তিশালী এবং পরিপক্ক বাইনারি (অথবা মাল্টিভেরিয়েট) শ্রেণিবিন্যাস অ্যালগরিদম। স্টক বৃদ্ধি পাবে নাকি পতন হবে তা ভবিষ্যদ্বাণী করা একটি সাধারণ বাইনারি শ্রেণিবিন্যাস সমস্যা। নন-লিনিয়ার ক্যাপাবিলিটি: কার্নেল ফাংশন (যেমন RBF কার্নেল) ব্যবহার করে, SVM ইনপুট বৈশিষ্ট্যগুলির মধ্যে জটিল নন-লিনিয়ার সম্পর্ক ক্যাপচার করতে পারে, যা আর্থিক বাজারের ডেটার জন্য অত্যন্ত গুরুত্বপূর্ণ। বৈশিষ্ট্য-চালিত: মডেলের কার্যকারিতা মূলত আপনি যে “বৈশিষ্ট্যগুলি” প্রদান করেন তার উপর নির্ভর করে। এখন গণনা করা আলফা ফ্যাক্টরটি একটি ভাল শুরু, এবং আমরা ভবিষ্যদ্বাণী ক্ষমতা উন্নত করার জন্য আরও এই জাতীয় বৈশিষ্ট্য তৈরি করতে পারি। এবার আমি ৩টি বৈশিষ্ট্যের রূপরেখা দিয়ে শুরু করেছি: ১: উচ্চ-ফ্রিকোয়েন্সি অর্ডার প্রবাহ বৈশিষ্ট্য: alpha_1min: গত মিনিটের সমস্ত টিকগুলির উপর ভিত্তি করে অর্ডার প্রবাহ ভারসাম্যহীনতা ফ্যাক্টর গণনা করা হয়েছে। alpha_5min: গত ৫ মিনিটের সমস্ত টিক এর উপর ভিত্তি করে অর্ডার প্রবাহ ভারসাম্যহীনতা ফ্যাক্টর গণনা করা হয়েছে। alpha_15min: গত 15 মিনিটের সমস্ত টিকের উপর ভিত্তি করে অর্ডার প্রবাহ ভারসাম্যহীনতা ফ্যাক্টর গণনা করা হয়েছে। ofi_1min (অর্ডার প্রবাহ ভারসাম্যহীনতা): ১ মিনিটের মধ্যে (ক্রয়ের পরিমাণ / বিক্রয় পরিমাণ) অনুপাত। এটি আলফার চেয়ে আরও সরাসরি। vol_per_trade_1min: ১ মিনিটের মধ্যে প্রতি ট্রেডের গড় পরিমাণ। বাজারে বড় অর্ডারের প্রভাব পড়ার লক্ষণ।

২: মূল্য এবং অস্থিরতার বৈশিষ্ট্য: log_return_5min: গত ৫ মিনিটে লগারিদমিক রিটার্ন রেট, log(Pt / P{t-5min})। অস্থিরতা_১৫ মিনিট: গত ১৫ মিনিটে লগের আদর্শ বিচ্যুতি ফিরে আসে, যা স্বল্পমেয়াদী অস্থিরতার একটি পরিমাপ। atr_14 (গড় সত্য পরিসর): গত ১৪টি ১-মিনিটের ক্যান্ডেলস্টিকের উপর ভিত্তি করে ATR মান, যা একটি ক্লাসিক অস্থিরতা নির্দেশক। rsi_14 (আপেক্ষিক শক্তি সূচক): এটি গত ১৪টি ১-মিনিটের ক্যান্ডেলস্টিকের RSI মানের উপর ভিত্তি করে অতিরিক্ত কেনা এবং অতিরিক্ত বিক্রিত অবস্থার একটি পরিমাপ।

৩: সময়ের বৈশিষ্ট্য: দিনের_ঘন্টা: বর্তমান ঘন্টা (০-২৩)। বিভিন্ন সময়কালে বাজারগুলি ভিন্নভাবে আচরণ করে (যেমন, এশিয়ান/ইউরোপীয়/আমেরিকান সেশন)। সপ্তাহের_দিন: সপ্তাহের দিন (০-৬)। সপ্তাহান্তে এবং কাজের দিনগুলিতে বিভিন্ন ওঠানামার ধরণ থাকে।

def calculate_features_and_labels(klines):
    """
    核心函数
    """
    features = []
    labels = []
    
    # 为了计算RSI等指标,我们需要价格序列
    close_prices = [k['close'] for k in klines]
    
    # 从第30根K线开始,因为需要足够的前置数据
    for i in range(30, len(klines) - PREDICT_HORIZON):
        # 1. 价格与波动率特征
        price_change_15m = (klines[i]['close'] - klines[i-15]['close']) / klines[i-15]['close']
        volatility_30m = np.std(close_prices[i-30:i])
        
        # 计算RSI
        diffs = np.diff(close_prices[i-14:i+1])
        gains = np.sum(diffs[diffs > 0]) / 14
        losses = -np.sum(diffs[diffs < 0]) / 14
        rs = gains / (losses + 1e-10)
        rsi_14 = 100 - (100 / (1 + rs))
        
        # 2. 时间特征
        dt_object = datetime.fromtimestamp(klines[i]['ts'] / 1000)
        hour_of_day = dt_object.hour
        day_of_week = dt_object.weekday()
        
        # 组合所有特征
        current_features = [price_change_15m, volatility_30m, rsi_14, hour_of_day, day_of_week]
        features.append(current_features)
        
        # 3. 数据标注
        future_price = klines[i + PREDICT_HORIZON]['close']
        current_price = klines[i]['close']
        
        if future_price > current_price * (1 + SPREAD_THRESHOLD):
            labels.append(0) # 涨
        elif future_price < current_price * (1 - SPREAD_THRESHOLD):
            labels.append(1) # 跌
        else:
            labels.append(2) # 横盘

তারপর উত্থান, পতন এবং পার্শ্বাভিমুখের মধ্যে পার্থক্য করার জন্য তিনটি বিভাগ ব্যবহার করুন।

ফিচার স্ক্রিনিংয়ের মূল ধারণা: “ভালো সতীর্থদের” খুঁজে বের করুন এবং “খারাপ সতীর্থদের” বাদ দিন

আমাদের লক্ষ্য হল এমন কিছু বৈশিষ্ট্য খুঁজে বের করা যা:

  • উচ্চ প্রাসঙ্গিকতা: প্রতিটি বৈশিষ্ট্যের ভবিষ্যতের মূল্য পরিবর্তনের (আমাদের লক্ষ্য লেবেল) সাথে একটি শক্তিশালী সম্পর্ক রয়েছে।
  • কম রিডানডেন্সি: বৈশিষ্ট্যগুলিতে খুব বেশি ডুপ্লিকেট তথ্য থাকা উচিত নয়। উদাহরণস্বরূপ, “৫-মিনিটের ভরবেগ” এবং “৬-মিনিটের ভরবেগ” অত্যন্ত একই রকম। উভয়কে অন্তর্ভুক্ত করলে মডেলটির খুব বেশি উন্নতি হবে না এবং এমনকি শব্দও হতে পারে।
  • স্থিতিশীলতা: সময়ের সাথে সাথে কোনও বৈশিষ্ট্যের বৈধতা খুব দ্রুত পরিবর্তিত হতে পারে না। যে বৈশিষ্ট্যটি কেবল একদিন বৈধ থাকে তা বিপজ্জনক।
def run_analysis_report(X, y, clf, scaler):
    Log("--- 模型分析报告 ---", "info")
    Log("1. 特征重要性 (代理模型: 随机森林):")
    rf = RandomForestClassifier(n_estimators=50, random_state=42); rf.fit(X, y)
    importances = sorted(zip(g_feature_names, rf.feature_importances_), key=lambda x: x[1], reverse=True)
    for name, importance in importances: Log(f"   - {name}: {importance:.4f}")
    Log("2. 特征与标签的互信息:"); mi_scores = mutual_info_classif(X, y)
    mi_scores = sorted(zip(g_feature_names, mi_scores), key=lambda x: x[1], reverse=True)
    for name, score in mi_scores: Log(f"   - {name}: {score:.4f}")
    Log("3. 历史数据回测表现:"); y_pred = clf.predict(scaler.transform(X)); accuracy = accuracy_score(y, y_pred)
    Log(f"   - **历史回测总胜率: {accuracy * 100:.2f}%**", "success")
    Log("4. 混淆矩阵 (行:真实, 列:预测):"); cm = confusion_matrix(y, y_pred)
    Log("      预测涨(0) 预测跌(1) 预测平(2)"); Log(f"真实涨(0): {cm[0] if len(cm) > 0 else [0,0,0]}")
    Log(f"真实跌(1): {cm[1] if len(cm) > 1 else [0,0,0]}"); Log(f"真实平(2): {cm[2] if len(cm) > 2 else [0,0,0]}")
    profit_chart = Chart({'title': {'text': f'历史回测净值曲线 (胜率: {accuracy*100:.2f}%)'}}); profit_chart.reset(); balance = 1
    for i in range(len(y)):
        if y_pred[i] == y[i] and y[i] != 2: balance *= (1 + 0.01)
        elif y_pred[i] != y[i] and y_pred[i] != 2: balance *= (1 - 0.01)
        profit_chart.add(i, balance)
    Log("--- 报告结束, 5秒后进入实盘预测 ---", "info"); Sleep(5000)

প্রক্রিয়াটি হল: তথ্য সংগ্রহ করা হচ্ছে মেশিন লার্নিং: মূল্য প্রবণতা পূর্বাভাস (1) বৈশিষ্ট্যের গুরুত্ব মেশিন লার্নিং: মূল্য প্রবণতা পূর্বাভাস (1) বৈশিষ্ট্য এবং লেবেলের মধ্যে পারস্পরিক তথ্য এবং ব্যাকটেস্ট তথ্য মেশিন লার্নিং: মূল্য প্রবণতা পূর্বাভাস (1)

আমি প্রথমে ভেবেছিলাম ৬৫% জয়ের হার যথেষ্ট হবে, কিন্তু আমি আশা করিনি যে এটি ৮১.৯৮% এ পৌঁছাবে। আমার প্রথম প্রতিক্রিয়া হওয়া উচিত: “এটা দারুন, কিন্তু এটা সত্য হতে খুব ভালো। এখানে অবশ্যই অন্বেষণ করার মতো কিছু আছে।”

1বিশ্লেষণ প্রতিবেদনের গভীর ব্যাখ্যা, প্রতিবেদনের বিষয়বস্তু একের পর এক ব্যাখ্যা করা:

  1. বৈশিষ্ট্যের গুরুত্ব এবং পারস্পরিক তথ্য: অস্থিরতা_৩০মি এবং মূল্য_পরিবর্তন_১৫মি হল সবচেয়ে গুরুত্বপূর্ণ বৈশিষ্ট্য। এটি যুক্তিসঙ্গত, যা ইঙ্গিত করে যে সাম্প্রতিক বাজারের প্রবণতা এবং অস্থিরতা ভবিষ্যতের সবচেয়ে শক্তিশালী ভবিষ্যদ্বাণীকারী। দিনের_ঘন্টা_ও কিছুটা অবদান রাখে, যা নির্দেশ করে যে মডেলটি দিনের বিভিন্ন সময়ে ট্রেডিং প্যাটার্নগুলি ধারণ করে। rsi_14 এবং day_of_week এর অবদান প্রায় 0, যা ইঙ্গিত দেয় যে বর্তমান ডেটাসেট এবং বৈশিষ্ট্য সংমিশ্রণের অধীনে এই দুটি বৈশিষ্ট্য “পিগ টিমমেট” হতে পারে। মডেলটি সহজ করতে এবং শব্দ প্রতিরোধ করতে আমরা ভবিষ্যতে এগুলি অপসারণের কথা বিবেচনা করতে পারি।
  2. কনফিউশন ম্যাট্রিক্স (এটি অনেক তথ্য!) প্রকৃত বৃদ্ধি (০):[১১ ০ ৩৩] -> ৪৪টি (১১+০+৩৩) প্রকৃত সমাবেশের মধ্যে, মডেলটি ১১টি সঠিকভাবে ভবিষ্যদ্বাণী করেছে, কিন্তু ৩৩ বার এটিকে “একত্রীকরণ” হিসাবে ভুল ভবিষ্যদ্বাণী করেছে। প্রকৃত পতন (১):[০ ১০ ৪৪] -> ৫৪টি (০+১০+৪৪) প্রকৃত পতনের মধ্যে, মডেলটি সঠিকভাবে ১০টি ভবিষ্যদ্বাণী করেছে, কিন্তু ৪৪ বার এটিকে “পাশের প্রবণতা” হিসেবে ভুল ভবিষ্যদ্বাণী করেছে। রিয়েল পিং (২):[3 2 352] -> 357টি (3+2+352) প্রকৃত একত্রীকরণের মধ্যে, মডেলটি সঠিকভাবে 352টির ভবিষ্যদ্বাণী করেছে!
  3. ঐতিহাসিক ব্যাকটেস্টের মোট জয়ের হার: ৮১.৯৮% এই উচ্চ জয়ের হারের মূল উৎস হল “একত্রীকরণ” পূর্বাভাসে মডেলটির অত্যন্ত উচ্চ নির্ভুলতা! মোট প্রায় ৪৫৫টি নমুনার মধ্যে, ৩৫০টিরও বেশি ছিল একত্রীকরণ বাজার, এবং মডেলটি সেগুলিকে প্রায় নিখুঁতভাবে চিহ্নিত করেছে। এটি নিজেই একটি অত্যন্ত মূল্যবান ক্ষমতা! একটি মডেল যা আপনাকে সঠিকভাবে বলতে পারে “এখনই স্থানান্তর না করাই ভালো”, তা আপনাকে অনেক ফি এবং অবৈধ লেনদেন বাঁচাতে সাহায্য করতে পারে।

2কেন প্রকৃত জয়ের হার ৮১.৯৮% এর কম হতে পারে?

  1. “একত্রীকরণ” এর সংজ্ঞা খুবই শিথিল: আমাদের SPREAD_THRESHOLD হল 0.5%। 15 মিনিটের মধ্যে 0.5% এর বেশি দামের ওঠানামা করা বেশ সাধারণ। ফলস্বরূপ, “একত্রীকরণ” নমুনাগুলি আমাদের ডেটাসেটের বিশাল সংখ্যাগরিষ্ঠ (প্রায় 80%) জন্য দায়ী। মডেলটি চতুরতার সাথে শিখেছে, “যখন আমি নিশ্চিত নই, তখন উচ্চ নির্ভুলতার সাথে ‘একত্রীকরণ’ অনুমান করুন।” এটি পরিসংখ্যানগতভাবে সঠিক, কিন্তু ট্রেডিংয়ে, আমরা দামের গতিবিধির পূর্বাভাস দেওয়ার বিষয়ে বেশি উদ্বিগ্ন।
  2. উত্থান-পতনের পূর্বাভাস দেওয়ার ক্ষমতা: ঊর্ধ্বমুখী প্রবণতার পূর্বাভাসের জয়ের হার: মডেলটি ১১ + ০ + ৩ = ১৪টি ঊর্ধ্বমুখী প্রবণতার পূর্বাভাস দিয়েছিল, যার মধ্যে মাত্র ১১টি সঠিক ছিল। জয়ের হার হল ১১ / ১৪ = ৭৮.৫%। চমৎকার! পূর্বাভাসিত পতনের জয়ের হার: মডেলটি 0 + 10 + 2 = 12 পতনের পূর্বাভাস দিয়েছিল এবং এর মধ্যে 10টি সঠিক ছিল। জয়ের হার হল 10 / 12 = 83.3%। আবারও, খুবই চিত্তাকর্ষক!
  3. ইন-স্যাম্পল ওভারফিটিং: এই পরীক্ষাটি মডেলের “পরিচিত” ডেটার উপর করা হয় (অর্থাৎ, প্রশিক্ষণ এবং পরীক্ষার জন্য ব্যবহৃত ডেটা)। এটি একজন শিক্ষার্থীকে তারা যে পরীক্ষাটি সম্পন্ন করেছে তা করতে বলার মতো; স্কোর সাধারণত বেশি হবে। নতুন, অদেখা ডেটাতে (লাইভ ট্রেডিং) মডেলের কর্মক্ষমতা প্রায় সবসময় এই স্কোরের চেয়ে কম হবে।

আমাদের কাছে এখন একটি প্রাথমিক, তবুও সম্ভাব্য বিশাল, “আলফা মডেল” রয়েছে। যদিও আমরা ৮১.৯৮% সংখ্যাটিকে ভবিষ্যতের জন্য একটি বাস্তবসম্মত ভবিষ্যদ্বাণী হিসাবে সরাসরি ব্যাখ্যা করতে পারি না, এটি একটি শক্তিশালী ইতিবাচক সংকেত, যা প্রমাণ করে যে ডেটাতে অনুমানযোগ্য নিদর্শন বিদ্যমান এবং আমাদের কাঠামো সফলভাবে সেগুলি ধরে রেখেছে! আমাদের এখন মনে হচ্ছে আমরা পাহাড়ের পাদদেশে উচ্চমানের সোনার আকরিকের প্রথম টুকরোটি খুঁজে পেয়েছি। আমাদের পরবর্তী পদক্ষেপ হল তাৎক্ষণিকভাবে এটি বিক্রি করা নয়, বরং আরও বিশেষায়িত সরঞ্জাম এবং কৌশল (বৈশিষ্ট্যগুলি অপ্টিমাইজ করা এবং পরামিতিগুলি সামঞ্জস্য করা) ব্যবহার করে আরও দক্ষতার সাথে এবং স্থিতিশীলভাবে পুরো পাহাড় খনন করা।

এবার “অণুজগতে” যুদ্ধের কুয়াশা পরিচয় করিয়ে দেওয়া যাক — অর্ডার প্রবাহ এবং অর্ডার বইয়ের বৈশিষ্ট্য ধাপ ১: ডেটা সংগ্রহ আপগ্রেড করুন - আরও গভীর চ্যানেলগুলিতে সাবস্ক্রাইব করুন অর্ডার বুকের তথ্য পেতে, ওয়েবসকেট সংযোগ পদ্ধতিটি শুধুমাত্র aggTrade (ডিল) তে সাবস্ক্রাইব করার পরিবর্তে aggTrade এবং depth (depth) উভয়ের সাবস্ক্রাইব করার পদ্ধতিতে পরিবর্তন করতে হবে। এর জন্য আমাদের আরও সাধারণ মাল্টি-স্ট্রিম সাবস্ক্রিপশন URL ব্যবহার করতে হবে। ধাপ ২: ফিচার ইঞ্জিনিয়ারিং আপগ্রেড করুন - “সমুদ্র, স্থল এবং বায়ু” এর জন্য একটি ট্রিনিটি ফিচার ম্যাট্রিক্স তৈরি করুন আমরা calculate_features_and_labels ফাংশনে নিম্নলিখিত নতুন বৈশিষ্ট্যগুলি যুক্ত করব:

  1. অর্ডার প্রবাহের বৈশিষ্ট্য (আলফা - সংক্ষিপ্ত): alpha_15m: ১৫ মিনিটের অর্ডার প্রবাহ ভারসাম্যহীনতার ফ্যাক্টর। এটি হল মূল অর্ডার প্রবাহ মেট্রিক যা আমরা আগে আলোচনা করেছি।
  2. অর্ডার বইয়ের বৈশিষ্ট্য (বই - সেনাবাহিনী): wobi_10s: গত ১০ সেকেন্ড ধরে ওয়েটেড অর্ডার বুকের ভারসাম্যহীনতা। এটি একটি অত্যন্ত উচ্চ-ফ্রিকোয়েন্সি সূচক যা বাজারে ক্রয়-বিক্রয়ের চাপ পরিমাপ করে। spread_10s: গত ১০ সেকেন্ডের গড় বিড-আস্ক স্প্রেড। স্বল্পমেয়াদী তরলতা প্রতিফলিত করে।
  3. মূল বৈশিষ্ট্য (মূল্য - নেভি): আমরা পূর্ববর্তী সংস্করণের সেরা পারফর্মিং বৈশিষ্ট্যগুলি রাখব এবং সেগুলিকে অপ্টিমাইজ করব।

এই নতুন বৈশিষ্ট্য ম্যাট্রিক্সটি একটি যৌথ যুদ্ধ কমান্ডের মতো, যা একই সাথে “সমুদ্র (মূল্যের প্রবণতা)”, “ভূমি (বাজারের অবস্থান)” এবং “বায়ু (লেনদেনের প্রভাব)” থেকে বাস্তব-সময়ের বুদ্ধিমত্তা উপলব্ধি করে এবং এর সিদ্ধান্ত গ্রহণের ক্ষমতা আগের চেয়ে অনেক উন্নত হবে। কোডটি নিম্নরূপ:


import json
import math
import time
import websocket
import threading
from datetime import datetime
import numpy as np
from sklearn import svm
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.feature_selection import mutual_info_classif
from sklearn.ensemble import RandomForestClassifier

# ========== 全局配置 ==========
TRAIN_BARS = 100
PREDICT_HORIZON = 15
SPREAD_THRESHOLD = 0.005
SYMBOL_FMZ = "ETH_USDT" 
SYMBOL_API = SYMBOL_FMZ.replace('_', '').lower()
WEBSOCKET_URL = f"wss://fstream.binance.com/stream?streams={SYMBOL_API}@aggTrade/{SYMBOL_API}@depth20@100ms"

# ========== 全局状态变量 ==========
g_model, g_scaler = None, None
g_klines_1min, g_ticks, g_order_book_history = [], [], []
g_last_kline_ts = 0
g_feature_names = ['price_change_15m', 'volatility_30m', 'rsi_14', 'hour_of_day', 
                   'alpha_15m', 'wobi_10s', 'spread_10s']

# ========== 特征工程与模型训练 ==========
def calculate_features_and_labels(klines, ticks, order_books_history, is_realtime=False):
    features, labels = [], []
    close_prices = [k['close'] for k in klines]
    
    # 根据是训练还是实时预测,决定循环范围
    start_index = 30
    end_index = len(klines) - PREDICT_HORIZON if not is_realtime else len(klines)

    for i in range(start_index, end_index):
        kline_start_ts = klines[i]['ts']
        
        # --- 特征计算部分 ---
        price_change_15m = (klines[i]['close'] - klines[i-15]['close']) / klines[i-15]['close']
        volatility_30m = np.std(close_prices[i-30:i])
        diffs = np.diff(close_prices[i-14:i+1]); gains = np.sum(diffs[diffs > 0]) / 14; losses = -np.sum(diffs[diffs < 0]) / 14
        rsi_14 = 100 - (100 / (1 + gains / (losses + 1e-10)))
        dt_object = datetime.fromtimestamp(kline_start_ts / 1000)
        ticks_in_15m = [t for t in ticks if t['ts'] >= klines[i-15]['ts'] and t['ts'] < kline_start_ts]
        buy_vol = sum(t['qty'] for t in ticks_in_15m if t['side'] == 'buy'); sell_vol = sum(t['qty'] for t in ticks_in_15m if t['side'] == 'sell')
        alpha_15m = (buy_vol - sell_vol) / (buy_vol + sell_vol + 1e-10)
        books_in_10s = [b for b in order_books_history if b['ts'] >= kline_start_ts - 10000 and b['ts'] < kline_start_ts]
        if not books_in_10s: wobi_10s, spread_10s = 0, 0.0
        else:
            wobis, spreads = [], []
            for book in books_in_10s:
                if not book['bids'] or not book['asks']: continue
                bid_vol = sum(float(p[1]) for p in book['bids']); ask_vol = sum(float(p[1]) for p in book['asks'])
                wobis.append(bid_vol / (bid_vol + ask_vol + 1e-10))
                spreads.append(float(book['asks'][0][0]) - float(book['bids'][0][0]))
            wobi_10s = np.mean(wobis) if wobis else 0; spread_10s = np.mean(spreads) if spreads else 0
        current_features = [price_change_15m, volatility_30m, rsi_14, dt_object.hour, alpha_15m, wobi_10s, spread_10s]
        features.append(current_features)
        
        # --- 标签计算部分  ---
        if not is_realtime:
            future_price = klines[i + PREDICT_HORIZON]['close']; current_price = klines[i]['close']
            if future_price > current_price * (1 + SPREAD_THRESHOLD): labels.append(0)
            elif future_price < current_price * (1 - SPREAD_THRESHOLD): labels.append(1)
            else: labels.append(2)
            
    return np.array(features), np.array(labels)

def run_analysis_report(X, y, clf, scaler):
    Log("--- 模型分析报告 ---", "info")
    Log("1. 特征重要性 (代理模型: 随机森林):")
    rf = RandomForestClassifier(n_estimators=50, random_state=42); rf.fit(X, y)
    importances = sorted(zip(g_feature_names, rf.feature_importances_), key=lambda x: x[1], reverse=True)
    for name, importance in importances: Log(f"   - {name}: {importance:.4f}")
    Log("2. 特征与标签的互信息:"); mi_scores = mutual_info_classif(X, y)
    mi_scores = sorted(zip(g_feature_names, mi_scores), key=lambda x: x[1], reverse=True)
    for name, score in mi_scores: Log(f"   - {name}: {score:.4f}")
    Log("3. 历史数据回测表现:"); y_pred = clf.predict(scaler.transform(X)); accuracy = accuracy_score(y, y_pred)
    Log(f"   - **历史回测总胜率: {accuracy * 100:.2f}%**", "success")
    Log("4. 混淆矩阵 (行:真实, 列:预测):"); cm = confusion_matrix(y, y_pred)
    Log("      预测涨(0) 预测跌(1) 预测平(2)"); Log(f"真实涨(0): {cm[0] if len(cm) > 0 else [0,0,0]}")
    Log(f"真实跌(1): {cm[1] if len(cm) > 1 else [0,0,0]}"); Log(f"真实平(2): {cm[2] if len(cm) > 2 else [0,0,0]}")
    profit_chart = Chart({'title': {'text': f'历史回测净值曲线 (胜率: {accuracy*100:.2f}%)'}}); profit_chart.reset(); balance = 1
    for i in range(len(y)):
        if y_pred[i] == y[i] and y[i] != 2: balance *= (1 + 0.01)
        elif y_pred[i] != y[i] and y_pred[i] != 2: balance *= (1 - 0.01)
        profit_chart.add(i, balance)
    Log("--- 报告结束, 5秒后进入实盘预测 ---", "info"); Sleep(5000)

def train_and_analyze():
    global g_model, g_scaler, g_klines_1min, g_ticks, g_order_book_history
    MIN_REQUIRED_BARS = 30 + PREDICT_HORIZON
    if len(g_klines_1min) < MIN_REQUIRED_BARS:
        Log(f"K线数量({len(g_klines_1min)})不足以进行特征工程,需要至少 {MIN_REQUIRED_BARS} 根。", "warning"); return False
    Log("开始训练模型 (V2.2)...")
    X, y = calculate_features_and_labels(g_klines_1min, g_ticks, g_order_book_history)
    if len(X) < 50 or len(set(y)) < 3:
        Log(f"有效训练样本不足(X: {len(X)}, 类别: {len(set(y))}),无法训练。", "warning"); return False
    scaler = StandardScaler(); X_scaled = scaler.fit_transform(X)
    clf = svm.SVC(kernel='rbf', C=1.0, gamma='scale'); clf.fit(X_scaled, y)
    g_model, g_scaler = clf, scaler
    Log("模型训练完成!", "success")
    run_analysis_report(X, y, g_model, g_scaler)
    return True

def aggregate_ticks_to_kline(ticks):
    if not ticks: return None
    return {'ts': ticks[0]['ts'] // 60000 * 60000, 'open': ticks[0]['price'], 'high': max(t['price'] for t in ticks), 'low': min(t['price'] for t in ticks), 'close': ticks[-1]['price'], 'volume': sum(t['qty'] for t in ticks)}

def on_message(ws, message):
    global g_ticks, g_klines_1min, g_last_kline_ts, g_order_book_history
    try:
        payload = json.loads(message)
        data = payload.get('data', {}); stream = payload.get('stream', '')
        if 'aggTrade' in stream:
            trade_data = {'ts': int(data['T']), 'price': float(data['p']), 'qty': float(data['q']), 'side': 'sell' if data['m'] else 'buy'}
            g_ticks.append(trade_data)
            current_minute_ts = trade_data['ts'] // 60000 * 60000
            if g_last_kline_ts == 0: g_last_kline_ts = current_minute_ts
            if current_minute_ts > g_last_kline_ts:
                last_minute_ticks = [t for t in g_ticks if t['ts'] >= g_last_kline_ts and t['ts'] < current_minute_ts]
                if last_minute_ticks:
                    kline = aggregate_ticks_to_kline(last_minute_ticks); g_klines_1min.append(kline)
                    g_ticks = [t for t in g_ticks if t['ts'] >= current_minute_ts]
                g_last_kline_ts = current_minute_ts
        elif 'depth' in stream:
            book_snapshot = {'ts': int(data['E']), 'bids': data['b'], 'asks': data['a']}
            g_order_book_history.append(book_snapshot)
            if len(g_order_book_history) > 5000: g_order_book_history.pop(0)
    except Exception as e: Log(f"OnMessage Error: {e}")

def start_websocket():
    ws = websocket.WebSocketApp(WEBSOCKET_URL, on_message=on_message)
    wst = threading.Thread(target=ws.run_forever); wst.daemon = True; wst.start()
    Log("WebSocket多流订阅已启动...")

# ========== 主程序入口  ==========
def main():
    global TRAIN_BARS
    
    exchange.SetContractType("swap")
    start_websocket()
    Log("策略启动,进入数据收集中...")
    main.last_predict_ts = 0

    while True:
        if g_model is None:
            # --- 训练模式 ---
            if len(g_klines_1min) >= TRAIN_BARS:
                if not train_and_analyze():
                    Log("模型训练或分析失败,将增加50根K线后重试...", "error")
                    TRAIN_BARS += 50
            else:
                LogStatus(f"正在收集K线数据: {len(g_klines_1min)} / {TRAIN_BARS}")
        else:
            # --- **新功能:实时预测模式** ---
            if len(g_klines_1min) > 0 and g_klines_1min[-1]['ts'] > main.last_predict_ts:
                # 1. 标记已处理,防止重复预测
                main.last_predict_ts = g_klines_1min[-1]['ts']
                kline_time_str = datetime.fromtimestamp(main.last_predict_ts / 1000).strftime('%H:%M:%S')
                Log(f"检测到新K线 ({kline_time_str}),准备进行实时预测...")

                # 2. 检查是否有足够历史数据来为这根新K线计算特征
                if len(g_klines_1min) < 30: # 至少需要30根历史K线
                    Log("历史K线不足,无法为当前新K线计算特征。", "warning")
                    continue

                # 3. 计算最新K线的特征
                # 我们只计算最后一条数据,所以传入 is_realtime=True
                latest_features, _ = calculate_features_and_labels(g_klines_1min, g_ticks, g_order_book_history, is_realtime=True)
                
                if latest_features.shape[0] == 0:
                    Log("无法为最新K线生成有效特征。", "warning")
                    continue
                
                # 4. 标准化并预测
                last_feature_vector = latest_features[-1].reshape(1, -1)
                last_feature_scaled = g_scaler.transform(last_feature_vector)
                prediction = g_model.predict(last_feature_scaled)[0]
                
                # 5. 展示预测结果
                prediction_text = ['**上涨**', '**下跌**', '盘整'][prediction]
                Log(f"==> 实时预测结果 ({kline_time_str}): 未来 {PREDICT_HORIZON} 分钟可能 {prediction_text}", "success" if prediction != 2 else "info")
                
                # 在这里,您可以根据 prediction 的结果,添加您的开平仓交易逻辑
                # 例如: if prediction == 0: exchange.Buy(...)

            else:
                LogStatus(f"模型已就绪,等待新K线... 当前K线数: {len(g_klines_1min)}")
        
        Sleep(1000) # 每秒检查一次是否有新K线

এই কোডটির জন্য অনেক K-লাইন গণনার প্রয়োজন। এই প্রতিবেদনটি অনেক মূল্যবান, কারণ এটি আমাদের মডেলটির “চিন্তাভাবনা” এবং “চরিত্র” সম্পর্কে বলে।

  1. ঐতিহাসিক ব্যাকটেস্টের মোট জয়ের হার: ৯৩.৩৩% এটি একটি অত্যন্ত চিত্তাকর্ষক সংখ্যা! যদিও আমাদের এটিকে বস্তুনিষ্ঠভাবে দেখতে হবে (এটি একটি নমুনা পরীক্ষা), এটি আমাদের নতুন যুক্ত হওয়া অর্ডার প্রবাহ এবং অর্ডার বই বৈশিষ্ট্যগুলির অপরিসীম ভবিষ্যদ্বাণীমূলক শক্তি স্পষ্টভাবে প্রদর্শন করে! মডেলটি ঐতিহাসিক তথ্যে খুব শক্তিশালী নিদর্শন খুঁজে পেয়েছে।
  2. বৈশিষ্ট্যের গুরুত্ব এবং পারস্পরিক তথ্য রাজার জন্ম: volatility_15m (অস্থিরতা) এবং price_change_5m (মূল্য পরিবর্তন) এখনও একেবারে মূল, যা প্রত্যাশা অনুযায়ী। উদীয়মান তারকা: rsi_14 এর গুরুত্ব উল্লেখযোগ্যভাবে বৃদ্ধি পেয়েছে! এটি ইঙ্গিত দেয় যে 5 মিনিটের কম সময়সীমায়, RSI এর “অতিরিক্ত কেনা/অতিরিক্ত বিক্রিত” সেন্টিমেন্ট সূচকটি আরও অর্থবহ হয়ে উঠেছে। সম্ভাব্যভাবে, wobi_10s (অর্ডার বইয়ের ভারসাম্যহীনতা) এবং spread_10s (স্প্রেড) এরও কিছু অবদান রয়েছে। এটি খুবই উৎসাহব্যঞ্জক এবং ইঙ্গিত দেয় যে আমাদের মাইক্রোস্ট্রাকচার বৈশিষ্ট্যগুলি কাজ শুরু করেছে! প্রতিফলন: alpha_5m (ক্রম প্রবাহ) এর অবদান প্রায় শূন্য। এটি আমাদের আলফা গণনা পদ্ধতির অতি সরলীকরণের কারণে হতে পারে, অথবা 5-মিনিটের আলফা এবং 5-মিনিটের মূল্য পরিবর্তনের মধ্যে খুব বেশি ওভারল্যাপিং তথ্য রয়েছে। ভবিষ্যতে এটি আমাদের জন্য একটি গুরুত্বপূর্ণ অপ্টিমাইজেশন পয়েন্ট।
  3. কনফিউশন ম্যাট্রিক্স (সাফল্যের মূল প্রমাণ!) প্রকৃত বৃদ্ধি (০):[২২ ০] -> ২২টি বাস্তব র‍্যালির সবগুলোতেই, মডেলটি ১০০% সঠিকভাবে ভবিষ্যদ্বাণী করেছে, কোনও ভুল ছাড়াই! প্রকৃত পতন (১):[2 6] -> 8টি প্রকৃত পতনের মধ্যে, মডেলটি সঠিকভাবে 6টি পূর্বাভাস দিয়েছে এবং 2টি মিস করেছে (বৃদ্ধি ভেবে ভুল করেছে)। ব্যাখ্যা: এই মডেলটি একটি অত্যন্ত আকর্ষণীয় ব্যক্তিত্ব প্রদর্শন করে: এটি একটি অত্যন্ত শক্তিশালী বুলিশ ডিটেক্টর, প্রায় নিখুঁতভাবে বুলিশ সংকেত ক্যাপচার করে। এটি বিয়ারিশ ট্রেন্ড সনাক্তকরণেও ভালো কাজ করে (68 = 75% নির্ভুলতা), তবে মাঝে মাঝে বিয়ারিশ ট্রেন্ডকে আপট্রেন্ড ভেবে ভুল করে।

তারপর কী হবে “ট্রেডিং সিগন্যাল স্টেট মেশিন” প্রবর্তন করা হচ্ছে এটি এই আপগ্রেডের মূল এবং সবচেয়ে উদ্ভাবনী অংশ। কৌশলটির বর্তমান “পজিশন” স্ট্যাটাস পরিচালনা করার জন্য আমরা g_active_signal এর মতো একটি গ্লোবাল স্টেট ভ্যারিয়েবল প্রবর্তন করব (মনে রাখবেন এটি শুধুমাত্র একটি ভার্চুয়াল পজিশন স্ট্যাটাস এবং এতে প্রকৃত ট্রেডিং জড়িত নয়)। এই স্টেট মেশিনের কাজের যুক্তি নিম্নরূপ:

  1. প্রাথমিক অবস্থা: নিষ্ক্রিয়
  • যখন কৌশলটি এই অবস্থায় থাকবে, তখন এটি প্রতিটি নতুন K-লাইনের জন্য ভবিষ্যদ্বাণী করবে, ঠিক যেমনটি এটি এখন করে।
  • শাসনব্যবস্থার পরিবর্তন: মডেলটি একবার একটি স্পষ্ট সংকেত (যেমন, “উপরে”) ভবিষ্যদ্বাণী করলে, কৌশলটি হবে:
  • জার্নালে একটি একক, আকর্ষণীয় এন্ট্রি সিগন্যাল প্রিন্ট করে, উদাহরণস্বরূপ, 🎯 নতুন ট্রেডিং সিগন্যাল: পূর্বাভাস বৃদ্ধি! পর্যবেক্ষণ সময়কাল ১৫ মিনিট।
  • কৌশলের অবস্থা Idle থেকে In-Signal-এ পরিবর্তন করে। বর্তমান সংকেতের ট্রিগার সময় এবং দিক রেকর্ড করুন।
  1. অবস্থানের অবস্থা: ইন-সিগন্যাল
  • যখন কৌশলটি এই অবস্থায় থাকবে, তখন এটি নতুন K লাইনের পূর্বাভাস দেওয়া সম্পূর্ণরূপে বন্ধ করে দেবে। এটি আর প্রতিটি মিনিটের ওঠানামার পরোয়া করে না এবং “বুলেট উড়তে দাও” মোডে প্রবেশ করে।
  • এটি কেবলমাত্র সময় পরীক্ষা করে: সিগন্যালটি চালু হওয়ার পর থেকে 15 মিনিট (PREDICT_HORIZON এর দৈর্ঘ্য) অতিবাহিত হয়েছে কিনা।
  • রাষ্ট্রীয় পরিবর্তন: ১৫ মিনিটের পর্যবেক্ষণ সময়ের পরে, নীতিটি:
  • লগে একটি স্পষ্ট প্রস্থান সংকেত প্রিন্ট করুন, যেমন 🏁 সংকেত সময়ের সমাপ্তি। কৌশলটি পুনরায় সেট করুন এবং নতুন সুযোগগুলি সন্ধান করুন…
  • কৌশলের অবস্থা হোল্ডিং থেকে আইডলে পরিবর্তন করে।
  • এই মুহুর্তে, কৌশলটি আবার নতুন কে-লাইনের পূর্বাভাস দেওয়া শুরু করবে এবং পরবর্তী ট্রেডিং সুযোগের সন্ধান করবে।

এই সহজ স্টেট মেশিনের সাহায্যে, আমরা নিখুঁতভাবে প্রয়োজনীয়তাগুলি অর্জন করেছি: একটি সংকেত, একটি সম্পূর্ণ পর্যবেক্ষণ চক্র, এবং এই সময়কালে কোনও হস্তক্ষেপের তথ্য নেই।

”` import json import math import time import websocket import threading from datetime import datetime import numpy as np from sklearn import svm from sklearn.preprocessing import StandardScaler from sklearn.metrics import confusion_matrix, accuracy_score from sklearn.feature_selection import mutual_info_classif from sklearn.ensemble import RandomForestClassifier

========== 全局配置 ==========

TRAIN_BARS = 200 #需要更多初始数据 PREDICT_HORIZON = 15 # 回归15分钟预测周期 SPREAD_THRESHOLD = 0.005 # 适配15分钟周期的涨跌阈值 SYMBOL_FMZ = “ETH_USDT” SYMBOL_API = SYMBOLFMZ.replace(’’, “).lower() WEBSOCKET_URL = f”wss://fstream.binance.com/stream?streams={SYMBOL_API}@aggTrade/{SYMBOL_API}@depth20@100ms”

========== 全局状态变量 ==========

g_model, g_scaler = None, None g_klines_1min, g_ticks, g_order_book_history = [], [], [] g_last_kline_ts = 0 g_feature_names = [‘price_change_15m’, ‘volatility_30m’, ‘rsi_14’, ‘hour_of_day’, ‘alpha_15m’, ‘wobi_10s’, ‘spread_10s’]

新功能: 信号状态机

g_active_signal = {‘active’: False, ‘start_ts’: 0, ‘prediction’: -1}

========== 特征工程与模型训练 ==========

def calculate_features_and_labels(klines, ticks, order_books_history, is_realtime=False): features, labels = [], [] close_prices = [k[‘close’] for k in klines]

start_index = 30
end_index = len(klines) - PREDICT_HORIZON if not is_realtime else len(klines)

for i in range(start_index, end_index):
    kline_start_ts = klines[i]['ts']

    price_change_15m = (klines[i]['close'] - klines[i-15]['close']) / klines[i-15]['close']
    volatility_30m = np.std(close_prices[i-30:i])
    diffs = np.diff(close_prices[i-14:i+1]); gains = np.sum(diffs[diffs > 0]) / 14; losses = -np.sum(diffs[diffs < 0]) / 14
    rsi_14 = 100 - (100 / (1 + gains / (losses + 1e-10)))
    dt_object = datetime.fromtimestamp(kline_start_ts / 1000)
    ticks_in_15m = [t for t in ticks if t['ts'] >= klines[i-15]['ts'] and t['ts'] < kline_start_ts]
    buy_vol = sum(t['qty'] for t in ticks_in_15m if t['side'] == 'buy'); sell_vol = sum(t['qty'] for t in ticks_in_15m if t['side'] == 'sell')
    alpha_15m = (buy_vol - sell_vol) / (buy_vol + sell_vol + 1e-10)
    books_in_10s = [b for b in order_books_history if b['ts'] >= kline_start_ts - 10000 and b['ts'] < kline_start_ts]
    if not books_in_10s: wobi_10s, spread_10s = 0, 0.0
    else:
        wobis, spreads = [], []
        for book in books_in_10s:
            if not book['bids'] or not book['asks']: continue
            bid_vol = sum(float(p[1]) for p in book['bids']); ask_vol = sum(float(p[1]) for p in book['asks'])
            wobis.append(bid_vol / (bid_vol + ask_vol + 1e-10))
            spreads.append(float(book['asks'][0][0]) - float(book['bids'][0][0]))
        wobi_10s = np.mean(wobis) if wobis else 0; spread_10s = np.mean(spreads) if spreads else 0

    current_features = [price_change_15m, volatility_30m, rsi_14, dt_object.hour, alpha_15m, wobi_10s, spread_10s]

    if not is_realtime:
        future_price = klines[i + PREDICT_HORIZON]['close']; current_price = klines[i]['close']
        if future_price > current_price * (1 + SPREAD_THRESHOLD):
            labels.append(0); features.append(current_features)
        elif future_price < current_price * (1 - SPREAD_THRESHOLD):
            labels.append(1); features.append(current_features)
    else:
        features.append(current_features)

return np.array(features), np.array(labels)

def run_analysis_report(X, y, clf, scaler): Log(“— 模型分析报告 V2.5 (15分钟预测) —”, “info”) Log(“1. 特征重要性 (代理模型: 随机森林):”) rf = RandomForestClassifier(n_estimators=50, random_state=42); rf.fit(X, y) importances = sorted(zip(g_feature_names, rf.featureimportances), key=lambda x: x[1], reverse=True) for name, importance in importances: Log(f” - {name}: {importance:.4f}“) Log(“2. 特征与标签的互信息:”); mi_scores = mutual_info_classif(X, y) mi_scores = sorted(zip(g_feature_names, mi_scores), key=lambda x: x[1], reverse=True) for name, score in mi_scores: Log(f” - {name}: {score:.4f}“) Log(“3. 历史数据回测表现:”); y_pred = clf.predict(scaler.transform(X)); accuracy = accuracy_score(y, y_pred) Log(f” - 历史回测总胜率: {accuracy * 100:.2f}%”, “success”) Log(“4. 混淆矩阵 (行:真实, 列:预测):”); cm = confusion_matrix(y, y_pred) Log(” 预测涨(0) 预测跌(1)“); Log(f”真实涨(0): {cm[0] if len(cm) > 0 else [0,0]}“) Log(f”真实跌(1): {cm[1] if len(cm) > 1 else [0,0]}“) profit_chart = Chart({‘title’: {‘text’: f’历史回测净值曲线 (胜率: {accuracy*100:.2f}%)‘}}); profit_chart.reset(); balance = 1 for i in range(len(y)): if y_pred[i] == y[i]: balance *= (1 + 0.01) else: balance *= (1 - 0.01) profit_chart.add(i, balance) Log(”— 报告结束, 5秒后进入实盘预测 —“, “info”); Sleep(5000)

def train_and_analyze(): global g_model, g_scaler, g_klines_1min, g_ticks, g_order_book_history MIN_REQUIRED_BARS = 30 + PREDICT_HORIZON if len(g_klines_1min) < MIN_REQUIRED_BARS: Log(f”K线数量({len(g_klines_1min)})不足以进行特征工程,需要至少 {MIN_REQUIRED_BARS} 根。”, “warning”); return False Log(“开始训练模型 (V2.5)…”) X, y = calculate_features_and_labels(g_klines_1min, g_ticks, g_order_book_history) if len(X) < 20 or len(set(y)) < 2: Log(f”有效涨跌样本不足(X: {len(X)}, 类别: {len(set(y))}),无法训练。”, “warning”); return False scaler = StandardScaler(); X_scaled = scaler.fit_transform(X) clf = svm.SVC(kernel=‘rbf’, C=1.0, gamma=‘scale’); clf.fit(X_scaled, y) g_model, g_scaler = clf, scaler Log(“模型训练完成!”, “success”) run_analysis_report(X, y, g_model, g_scaler) return True

========== WebSocket实时数据处理 ==========

def aggregate_ticks_to_kline(ticks): if not ticks: return None return {‘ts’: ticks[0][‘ts’] // 60000 * 60000, ‘open’: ticks[0][‘price’], ‘high’: max(t[‘price’] for t in ticks), ‘low’: min(t[‘price’] for t in ticks), ‘close’: ticks[-1][‘price’], ‘volume’: sum(t[‘qty’] for t in ticks)}

def on_message(ws, message): global g_ticks, g_klines_1min, g_last_kline_ts, g_order_book_history try: payload = json.loads(message) data = payload.get(‘data’, {}); stream = payload.get(‘stream’, “) if ‘aggTrade’ in stream: trade_data = {‘ts’: int(data[’T’]), ‘price’: float(data[‘p’]), ‘qty’: float(data[‘q’]), ‘side’: ‘sell’ if data[’m’] else ‘buy’} g_ticks.append(trade_data) current_minute_ts = trade_data[‘ts’] // 60000 * 60000 if g_last_kline_ts == 0: g_last_kline_ts = current_minute_ts if current_minute_ts > g_last_kline_ts: last_minute_ticks = [t for t in g_ticks if t[‘ts’] >= g_last_kline_ts and t[‘ts’] < current_minute_ts] if last_minute_ticks: kline = aggregate_ticks_to_kline(last_minute_ticks); g_klines_1min.append(kline) g_ticks = [t for t in g_ticks if t[‘ts’] >= current_minute_ts] g_last_kline_ts = current_minute_ts elif ‘depth’ in stream: book_snapshot = {‘ts’: int(data[‘E’]), ‘bids’: data[‘b’], ‘asks’: data[‘a’]} g_order_book_history.append(book_snaps