4
Подписаться
1271
Подписчики

Научить вас шаг за шагом писать версию синтеза K-line на Python

Создано: 2019-12-21 09:38:26, Обновлено: 2024-12-15 15:59:54
comments   4
hits   2834

Научить вас шаг за шагом писать версию синтеза K-line на Python

Научить вас шаг за шагом писать версию синтеза K-line на Python

При написании и использовании стратегий часто используются некоторые необычные данные цикла K-линии. Однако биржи и источники данных не предоставляют данные за эти периоды. Его можно синтезировать только с использованием данных существующих циклов. Алгоритм синтеза уже имеет версию JavaScript (Связь), на самом деле, перенести код JavaScript на версию Python очень просто. Далее давайте напишем версию алгоритма синтеза K-line на Python.

JavaScript-версия

  function GetNewCycleRecords (sourceRecords, targetCycle) {    // K线合成函数
      var ret = []
      
      // 首先获取源K线数据的周期
      if (!sourceRecords || sourceRecords.length < 2) {
          return null
      }
      var sourceLen = sourceRecords.length
      var sourceCycle = sourceRecords[sourceLen - 1].Time - sourceRecords[sourceLen - 2].Time

      if (targetCycle % sourceCycle != 0) {
          Log("targetCycle:", targetCycle)
          Log("sourceCycle:", sourceCycle)
          throw "targetCycle is not an integral multiple of sourceCycle."
      }

      if ((1000 * 60 * 60) % targetCycle != 0 && (1000 * 60 * 60 * 24) % targetCycle != 0) {
          Log("targetCycle:", targetCycle)
          Log("sourceCycle:", sourceCycle)
          Log((1000 * 60 * 60) % targetCycle, (1000 * 60 * 60 * 24) % targetCycle)
          throw "targetCycle cannot complete the cycle."
      }

      var multiple = targetCycle / sourceCycle


      var isBegin = false 
      var count = 0
      var high = 0 
      var low = 0 
      var open = 0
      var close = 0 
      var time = 0
      var vol = 0
      for (var i = 0 ; i < sourceLen ; i++) {
          // 获取 时区偏移数值
          var d = new Date()
          var n = d.getTimezoneOffset()

          if (((1000 * 60 * 60 * 24) - sourceRecords[i].Time % (1000 * 60 * 60 * 24) + (n * 1000 * 60)) % targetCycle == 0) {
              isBegin = true
          }

          if (isBegin) {
              if (count == 0) {
                  high = sourceRecords[i].High
                  low = sourceRecords[i].Low
                  open = sourceRecords[i].Open
                  close = sourceRecords[i].Close
                  time = sourceRecords[i].Time
                  vol = sourceRecords[i].Volume

                  count++
              } else if (count < multiple) {
                  high = Math.max(high, sourceRecords[i].High)
                  low = Math.min(low, sourceRecords[i].Low)
                  close = sourceRecords[i].Close
                  vol += sourceRecords[i].Volume

                  count++
              }

              if (count == multiple || i == sourceLen - 1) {
                  ret.push({
                      High : high,
                      Low : low,
                      Open : open,
                      Close : close,
                      Time : time,
                      Volume : vol,
                  })
                  count = 0
              }
          }
      }

      return ret 
  }

Если есть алгоритм JavaScript, его можно перевести и пересадить на Python построчно. Когда вы сталкиваетесь со встроенными функциями JavaScript или присущими ему методами, просто ищите соответствующие методы в Python, поэтому пересадка относительно проста. Логика алгоритма точно такая же, за исключением вызова функции JavaScriptvar n = d.getTimezoneOffset()При портировании на Python используйте библиотеку времени Pythonn = time.altzoneзаменять. Другие различия заключаются только в синтаксисе языка (например, использование циклов for, различия в булевых значениях, различия в использовании логических «И», «НЕ», «ИЛИ» и т. д.).

Трансплантированный код Python:

import time

def GetNewCycleRecords(sourceRecords, targetCycle):
    ret = []

    # 首先获取源K线数据的周期
    if not sourceRecords or len(sourceRecords) < 2 : 
        return None

    sourceLen = len(sourceRecords)
    sourceCycle = sourceRecords[-1]["Time"] - sourceRecords[-2]["Time"]

    if targetCycle % sourceCycle != 0 :
        Log("targetCycle:", targetCycle)
        Log("sourceCycle:", sourceCycle)
        raise "targetCycle is not an integral multiple of sourceCycle."

    if (1000 * 60 * 60) % targetCycle != 0 and (1000 * 60 * 60 * 24) % targetCycle != 0 : 
        Log("targetCycle:", targetCycle)
        Log("sourceCycle:", sourceCycle)
        Log((1000 * 60 * 60) % targetCycle, (1000 * 60 * 60 * 24) % targetCycle)
        raise "targetCycle cannot complete the cycle."
    
    multiple = targetCycle / sourceCycle

    isBegin = False
    count = 0 
    barHigh = 0 
    barLow = 0 
    barOpen = 0
    barClose = 0 
    barTime = 0 
    barVol = 0 

    for i in range(sourceLen) : 
        # 获取时区偏移数值
        n = time.altzone        

        if ((1000 * 60 * 60 * 24) - (sourceRecords[i]["Time"] * 1000) % (1000 * 60 * 60 * 24) + (n * 1000)) % targetCycle == 0 :
            isBegin = True

        if isBegin : 
            if count == 0 : 
                barHigh = sourceRecords[i]["High"]
                barLow = sourceRecords[i]["Low"]
                barOpen = sourceRecords[i]["Open"]
                barClose = sourceRecords[i]["Close"]
                barTime = sourceRecords[i]["Time"]
                barVol = sourceRecords[i]["Volume"]
                count += 1
            elif count < multiple : 
                barHigh = max(barHigh, sourceRecords[i]["High"])
                barLow = min(barLow, sourceRecords[i]["Low"])
                barClose = sourceRecords[i]["Close"]
                barVol += sourceRecords[i]["Volume"]
                count += 1

            if count == multiple or i == sourceLen - 1 :
                ret.append({
                    "High" : barHigh,
                    "Low" : barLow,
                    "Open" : barOpen,
                    "Close" : barClose,
                    "Time" : barTime,
                    "Volume" : barVol,
                })
                count = 0
    
    return ret 

# 测试
def main():
    while True:
        r = exchange.GetRecords()
        r2 = GetNewCycleRecords(r, 1000 * 60 * 60 * 4)      

        ext.PlotRecords(r2, "r2")                                 
        Sleep(1000)                                             

тест

График рынка Huobi Научить вас шаг за шагом писать версию синтеза K-line на Python

Тестирование синтетического 4-часового графика Научить вас шаг за шагом писать версию синтеза K-line на Python

Приведенный выше код используется только в качестве учебного пособия. Если он используется в определенной стратегии, пожалуйста, измените и протестируйте его в соответствии с вашими потребностями. Если у вас есть какие-либо ОШИБКИ или предложения по улучшению, пожалуйста, оставьте сообщение, большое спасибо o^_^o