Apprendre à écrire une fonction de synthèse de ligne K dans la version Python

Auteur:Je ne sais pas., Créé: 2022-12-26 09:28:58, Mis à jour: 2023-09-20 09:48:46

img

Apprendre à écrire une fonction de synthèse de ligne K dans la version Python

Lors de l'écriture et de l'utilisation de stratégies, nous utilisons souvent des données de période de ligne K rarement utilisées. Cependant, les échanges et les sources de données ne fournissent pas de données sur ces périodes. Il ne peut être synthétisé qu'en utilisant des données avec une période existante. L'algorithme synthétisé a déjà une version JavaScript (lienEn fait, il est facile de transplanter un morceau de code JavaScript dans Python. Ensuite, écrivons une version Python de l'algorithme de synthèse de ligne K.

Version JavaScript

  function GetNewCycleRecords (sourceRecords, targetCycle) {    // K-line synthesis function
      var ret = []
      
      // Obtain the period of the source K-line data first
      if (!sourceRecords || sourceRecords.length < 2) {
          return null
      }
      var sourceLen = sourceRecords.length
      var sourceCycle = sourceRecords[sourceLen - 1].Time - sourceRecords[sourceLen - 2].Time

      if (targetCycle % sourceCycle != 0) {
          Log("targetCycle:", targetCycle)
          Log("sourceCycle:", sourceCycle)
          throw "targetCycle is not an integral multiple of sourceCycle."
      }

      if ((1000 * 60 * 60) % targetCycle != 0 && (1000 * 60 * 60 * 24) % targetCycle != 0) {
          Log("targetCycle:", targetCycle)
          Log("sourceCycle:", sourceCycle)
          Log((1000 * 60 * 60) % targetCycle, (1000 * 60 * 60 * 24) % targetCycle)
          throw "targetCycle cannot complete the cycle."
      }

      var multiple = targetCycle / sourceCycle


      var isBegin = false 
      var count = 0
      var high = 0 
      var low = 0 
      var open = 0
      var close = 0 
      var time = 0
      var vol = 0
      for (var i = 0 ; i < sourceLen ; i++) {
          // Get the time zone offset value
          var d = new Date()
          var n = d.getTimezoneOffset()

          if (((1000 * 60 * 60 * 24) - sourceRecords[i].Time % (1000 * 60 * 60 * 24) + (n * 1000 * 60)) % targetCycle == 0) {
              isBegin = true
          }

          if (isBegin) {
              if (count == 0) {
                  high = sourceRecords[i].High
                  low = sourceRecords[i].Low
                  open = sourceRecords[i].Open
                  close = sourceRecords[i].Close
                  time = sourceRecords[i].Time
                  vol = sourceRecords[i].Volume

                  count++
              } else if (count < multiple) {
                  high = Math.max(high, sourceRecords[i].High)
                  low = Math.min(low, sourceRecords[i].Low)
                  close = sourceRecords[i].Close
                  vol += sourceRecords[i].Volume

                  count++
              }

              if (count == multiple || i == sourceLen - 1) {
                  ret.push({
                      High : high,
                      Low : low,
                      Open : open,
                      Close : close,
                      Time : time,
                      Volume : vol,
                  })
                  count = 0
              }
          }
      }

      return ret 
  }

Il existe des algorithmes JavaScript. Python peut être traduit et transplanté ligne par ligne. Si vous rencontrez des fonctions intégrées ou des méthodes inhérentes à JavaScript, vous pouvez aller à Python pour trouver les méthodes correspondantes. Par conséquent, la migration est facile. La logique de l'algorithme est exactement la même, sauf que la fonction JavaScript appellevar n=d.getTimezoneOffset(). lors de la migration vers Python,n=time.altzoneLes autres différences sont uniquement en termes de grammaire du langage (comme l'utilisation de for pour les boucles, les valeurs booléennes, logique AND, logique NOT, logique OR, etc.).

Code Python migré:

import time

def GetNewCycleRecords(sourceRecords, targetCycle):
    ret = []

    # Obtain the period of the source K-line data first
    if not sourceRecords or len(sourceRecords) < 2 : 
        return None

    sourceLen = len(sourceRecords)
    sourceCycle = sourceRecords[-1]["Time"] - sourceRecords[-2]["Time"]

    if targetCycle % sourceCycle != 0 :
        Log("targetCycle:", targetCycle)
        Log("sourceCycle:", sourceCycle)
        raise "targetCycle is not an integral multiple of sourceCycle."

    if (1000 * 60 * 60) % targetCycle != 0 and (1000 * 60 * 60 * 24) % targetCycle != 0 : 
        Log("targetCycle:", targetCycle)
        Log("sourceCycle:", sourceCycle)
        Log((1000 * 60 * 60) % targetCycle, (1000 * 60 * 60 * 24) % targetCycle)
        raise "targetCycle cannot complete the cycle."
    
    multiple = targetCycle / sourceCycle

    isBegin = False
    count = 0 
    barHigh = 0 
    barLow = 0 
    barOpen = 0
    barClose = 0 
    barTime = 0 
    barVol = 0 

    for i in range(sourceLen) : 
        # Get the time zone offset value
        n = time.altzone        

        if ((1000 * 60 * 60 * 24) - (sourceRecords[i]["Time"] * 1000) % (1000 * 60 * 60 * 24) + (n * 1000)) % targetCycle == 0 :
            isBegin = True

        if isBegin : 
            if count == 0 : 
                barHigh = sourceRecords[i]["High"]
                barLow = sourceRecords[i]["Low"]
                barOpen = sourceRecords[i]["Open"]
                barClose = sourceRecords[i]["Close"]
                barTime = sourceRecords[i]["Time"]
                barVol = sourceRecords[i]["Volume"]
                count += 1
            elif count < multiple : 
                barHigh = max(barHigh, sourceRecords[i]["High"])
                barLow = min(barLow, sourceRecords[i]["Low"])
                barClose = sourceRecords[i]["Close"]
                barVol += sourceRecords[i]["Volume"]
                count += 1

            if count == multiple or i == sourceLen - 1 :
                ret.append({
                    "High" : barHigh,
                    "Low" : barLow,
                    "Open" : barOpen,
                    "Close" : barClose,
                    "Time" : barTime,
                    "Volume" : barVol,
                })
                count = 0
    
    return ret 

# Test
def main():
    while True:
        r = exchange.GetRecords()
        r2 = GetNewCycleRecords(r, 1000 * 60 * 60 * 4)      

        ext.PlotRecords(r2, "r2")                                 
        Sleep(1000)

Test de détection

Graphique du marché Huobi

img

Graphique de synthèse des backtests de 4 heures

img

Le code ci-dessus est uniquement à titre de référence; s'il est utilisé dans des stratégies spécifiques, veuillez le modifier et le tester conformément aux exigences spécifiques. S'il y a un bug ou une suggestion d'amélioration, veuillez laisser un message.


Relationnée

Plus de