avatar of 发明者量化-小小梦 发明者量化-小小梦
Seguir Mensajes Privados
4
Seguir
1271
Seguidores

Te enseñaré paso a paso a escribir una versión Python de la función de síntesis de K-line

Creado el: 2019-12-21 09:38:26, Actualizado el: 2024-12-15 15:59:54
comments   4
hits   2839

Te enseñaré paso a paso a escribir una versión Python de la función de síntesis de K-line

Te enseñaré paso a paso a escribir una versión Python de la función de síntesis de K-line

Al escribir y utilizar estrategias, a menudo se utilizan algunos datos del ciclo de la línea K poco comunes. Sin embargo, los intercambios y las fuentes de datos no proporcionan datos para estos períodos. Sólo se puede sintetizar utilizando datos de ciclos existentes. El algoritmo de síntesis ya tiene una versión en JavaScript (Enlace), de hecho, es muy sencillo portar un código JavaScript a la versión Python. A continuación, escribamos una versión en Python del algoritmo de síntesis de K-line.

Versión de JavaScript

  function GetNewCycleRecords (sourceRecords, targetCycle) {    // K线合成函数
      var ret = []
      
      // 首先获取源K线数据的周期
      if (!sourceRecords || sourceRecords.length < 2) {
          return null
      }
      var sourceLen = sourceRecords.length
      var sourceCycle = sourceRecords[sourceLen - 1].Time - sourceRecords[sourceLen - 2].Time

      if (targetCycle % sourceCycle != 0) {
          Log("targetCycle:", targetCycle)
          Log("sourceCycle:", sourceCycle)
          throw "targetCycle is not an integral multiple of sourceCycle."
      }

      if ((1000 * 60 * 60) % targetCycle != 0 && (1000 * 60 * 60 * 24) % targetCycle != 0) {
          Log("targetCycle:", targetCycle)
          Log("sourceCycle:", sourceCycle)
          Log((1000 * 60 * 60) % targetCycle, (1000 * 60 * 60 * 24) % targetCycle)
          throw "targetCycle cannot complete the cycle."
      }

      var multiple = targetCycle / sourceCycle


      var isBegin = false 
      var count = 0
      var high = 0 
      var low = 0 
      var open = 0
      var close = 0 
      var time = 0
      var vol = 0
      for (var i = 0 ; i < sourceLen ; i++) {
          // 获取 时区偏移数值
          var d = new Date()
          var n = d.getTimezoneOffset()

          if (((1000 * 60 * 60 * 24) - sourceRecords[i].Time % (1000 * 60 * 60 * 24) + (n * 1000 * 60)) % targetCycle == 0) {
              isBegin = true
          }

          if (isBegin) {
              if (count == 0) {
                  high = sourceRecords[i].High
                  low = sourceRecords[i].Low
                  open = sourceRecords[i].Open
                  close = sourceRecords[i].Close
                  time = sourceRecords[i].Time
                  vol = sourceRecords[i].Volume

                  count++
              } else if (count < multiple) {
                  high = Math.max(high, sourceRecords[i].High)
                  low = Math.min(low, sourceRecords[i].Low)
                  close = sourceRecords[i].Close
                  vol += sourceRecords[i].Volume

                  count++
              }

              if (count == multiple || i == sourceLen - 1) {
                  ret.push({
                      High : high,
                      Low : low,
                      Open : open,
                      Close : close,
                      Time : time,
                      Volume : vol,
                  })
                  count = 0
              }
          }
      }

      return ret 
  }

Si existe un algoritmo de JavaScript, se puede traducir y trasplantar a Python línea por línea. Cuando se encuentran funciones integradas o métodos inherentes de JavaScript, simplemente se buscan los métodos correspondientes en Python, por lo que el trasplante es relativamente fácil. La lógica del algoritmo es exactamente la misma, solo la llamada a la función JavaScript.var n = d.getTimezoneOffset()Cuando se porta a Python, use la biblioteca de tiempo de Pythonn = time.altzonereemplazar. Otras diferencias solo están en la sintaxis del lenguaje (como el uso de bucles for, diferencias en valores booleanos, diferencias en el uso de “y” lógico, “no” lógico, “o” lógico, etc.).

El código Python trasplantado:

import time

def GetNewCycleRecords(sourceRecords, targetCycle):
    ret = []

    # 首先获取源K线数据的周期
    if not sourceRecords or len(sourceRecords) < 2 : 
        return None

    sourceLen = len(sourceRecords)
    sourceCycle = sourceRecords[-1]["Time"] - sourceRecords[-2]["Time"]

    if targetCycle % sourceCycle != 0 :
        Log("targetCycle:", targetCycle)
        Log("sourceCycle:", sourceCycle)
        raise "targetCycle is not an integral multiple of sourceCycle."

    if (1000 * 60 * 60) % targetCycle != 0 and (1000 * 60 * 60 * 24) % targetCycle != 0 : 
        Log("targetCycle:", targetCycle)
        Log("sourceCycle:", sourceCycle)
        Log((1000 * 60 * 60) % targetCycle, (1000 * 60 * 60 * 24) % targetCycle)
        raise "targetCycle cannot complete the cycle."
    
    multiple = targetCycle / sourceCycle

    isBegin = False
    count = 0 
    barHigh = 0 
    barLow = 0 
    barOpen = 0
    barClose = 0 
    barTime = 0 
    barVol = 0 

    for i in range(sourceLen) : 
        # 获取时区偏移数值
        n = time.altzone        

        if ((1000 * 60 * 60 * 24) - (sourceRecords[i]["Time"] * 1000) % (1000 * 60 * 60 * 24) + (n * 1000)) % targetCycle == 0 :
            isBegin = True

        if isBegin : 
            if count == 0 : 
                barHigh = sourceRecords[i]["High"]
                barLow = sourceRecords[i]["Low"]
                barOpen = sourceRecords[i]["Open"]
                barClose = sourceRecords[i]["Close"]
                barTime = sourceRecords[i]["Time"]
                barVol = sourceRecords[i]["Volume"]
                count += 1
            elif count < multiple : 
                barHigh = max(barHigh, sourceRecords[i]["High"])
                barLow = min(barLow, sourceRecords[i]["Low"])
                barClose = sourceRecords[i]["Close"]
                barVol += sourceRecords[i]["Volume"]
                count += 1

            if count == multiple or i == sourceLen - 1 :
                ret.append({
                    "High" : barHigh,
                    "Low" : barLow,
                    "Open" : barOpen,
                    "Close" : barClose,
                    "Time" : barTime,
                    "Volume" : barVol,
                })
                count = 0
    
    return ret 

# 测试
def main():
    while True:
        r = exchange.GetRecords()
        r2 = GetNewCycleRecords(r, 1000 * 60 * 60 * 4)      

        ext.PlotRecords(r2, "r2")                                 
        Sleep(1000)                                             

prueba

Gráfico del mercado de Huobi Te enseñaré paso a paso a escribir una versión Python de la función de síntesis de K-line

Prueba retrospectiva del gráfico sintético de 4 horas Te enseñaré paso a paso a escribir una versión Python de la función de síntesis de K-line

El código anterior se utiliza únicamente como referencia de aprendizaje. Si se utiliza en una estrategia específica, modifíquelo y pruébelo según sus necesidades. Si tienes algún ERROR o sugerencia de mejora, deja un mensaje, muchas gracias o^_^o