使用 通用协议 插件 实现 JS 语言 并发 任务

Author: 小小梦, Created: 2018-07-19 19:10:48, Updated: 2018-07-19 19:15:42

使用 通用协议 插件 实现 JS 语言 并发 任务

一直以来,总想 在使用JavaScript 编写策略的时候使用 一个 完全 脱离主程序的并发 逻辑 不停获取 行情,或者做一些其它操作。 python 语言可以 直接在策略里面使用 多线程 库, JavaScript 则没有办法实现。 不过 并发 获取行情 也有 更简单的处理 可以使用 exchange.Go 函数。

  • 使用 通用协议 插件 实现一个 并发程序 进行不停 访问 行情接口 收集行情数据。

    golang 抛砖引玉:

/*
GOOS=linux GOARCH=amd64 go build -ldflags '-s -w -extldflags -static' AssistantEx.go
*/
package main

import (
    "bytes"
    // "crypto/md5"
    // "encoding/hex"
    "encoding/json"
    "errors"
    "flag"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"
    "net/url"
    "sort"
    "strconv"
    "strings"
    "time"
    "crypto/tls"                           
    "context"
)

// 测试用函数 ---------------------------
func toFloat(s interface{}) float64 {
    var ret float64
    switch v := s.(type) {
    case float64:
        ret = v
    case float32:
        ret = float64(v)
    case int64:
        ret = float64(v)
    case int:
        ret = float64(v)
    case int32:
        ret = float64(v)
    case string:
        ret, _ = strconv.ParseFloat(strings.TrimSpace(v), 64)
    }
    return ret
}

var httpClient *http.Client                                                 
func init() {                                                               
    timeout := time.Minute                                                  
    httpClient = &http.Client{                                              
        Transport: &http.Transport{
            MaxIdleConnsPerHost:   5,                                       
            TLSClientConfig:       &tls.Config{InsecureSkipVerify: true},   
            ResponseHeaderTimeout: timeout,                                 
        },
        Timeout: timeout,                                                   
    }
}

func float2str(i float64) string {
    return strconv.FormatFloat(i, 'f', -1, 64)
}

func toInt64(s interface{}) int64 {
    var ret int64
    switch v := s.(type) {
    case int:
        ret = int64(v)
    case float64:
        ret = int64(v)
    case bool:
        if v {
            ret = 1
        } else {
            ret = 0
        }
    case int64:
        ret = v
    case string:
        ret, _ = strconv.ParseInt(strings.TrimSpace(v), 10, 64)
    }
    return ret
}

func toString(s interface{}) string {
    var ret string
    switch v := s.(type) {
    case string:
        ret = v
    case int64:
        ret = strconv.FormatInt(v, 10)
    case float64:
        ret = strconv.FormatFloat(v, 'f', -1, 64)
    case bool:
        ret = strconv.FormatBool(v)
    default:
        ret = fmt.Sprintf("%v", s)
    }
    return ret
}

type Json struct {
    data interface{}
}

func NewJson(body []byte) (*Json, error) {
    j := new(Json)
    err := j.UnmarshalJSON(body)
    if err != nil {
        return nil, err
    }
    return j, nil
}

func (j *Json) UnmarshalJSON(p []byte) error {
    return json.Unmarshal(p, &j.data)
}

func (j *Json) Get(key string) *Json {
    m, err := j.Map()
    if err == nil {
        if val, ok := m[key]; ok {
            return &Json{val}
        }
    }
    return &Json{nil}
}

func (j *Json) CheckGet(key string) (*Json, bool) {
    m, err := j.Map()
    if err == nil {
        if val, ok := m[key]; ok {
            return &Json{val}, true
        }
    }
    return nil, false
}

func (j *Json) Map() (map[string]interface{}, error) {
    if m, ok := (j.data).(map[string]interface{}); ok {
        return m, nil
    }
    return nil, errors.New("type assertion to map[string]interface{} failed")
}

func (j *Json) Array() ([]interface{}, error) {
    if a, ok := (j.data).([]interface{}); ok {
        return a, nil
    }
    return nil, errors.New("type assertion to []interface{} failed")
}

func (j *Json) Bool() (bool, error) {
    if s, ok := (j.data).(bool); ok {
        return s, nil
    }
    return false, errors.New("type assertion to bool failed")
}

func (j *Json) String() (string, error) {
    if s, ok := (j.data).(string); ok {
        return s, nil
    }
    return "", errors.New("type assertion to string failed")
}

func (j *Json) Bytes() ([]byte, error) {
    if s, ok := (j.data).(string); ok {
        return []byte(s), nil
    }
    return nil, errors.New("type assertion to []byte failed")
}

func (j *Json) Int() (int, error) {
    if f, ok := (j.data).(float64); ok {
        return int(f), nil
    }

    return -1, errors.New("type assertion to float64 failed")
}

func (j *Json) MustArray(args ...[]interface{}) []interface{} {
    var def []interface{}

    switch len(args) {
    case 0:
    case 1:
        def = args[0]
    default:
        log.Panicf("MustArray() received too many arguments %d", len(args))
    }

    a, err := j.Array()
    if err == nil {
        return a
    }

    return def
}

func (j *Json) MustMap(args ...map[string]interface{}) map[string]interface{} {
    var def map[string]interface{}

    switch len(args) {
    case 0:
    case 1:
        def = args[0]
    default:
        log.Panicf("MustMap() received too many arguments %d", len(args))
    }

    a, err := j.Map()
    if err == nil {
        return a
    }

    return def
}

func (j *Json) MustString(args ...string) string {
    var def string

    switch len(args) {
    case 0:
    case 1:
        def = args[0]
    default:
        log.Panicf("MustString() received too many arguments %d", len(args))
    }

    s, err := j.String()
    if err == nil {
        return s
    }

    return def
}

func (j *Json) MustInt64() int64 {
    var ret int64
    var err error
    switch v := j.data.(type) {
    case int:
        ret = int64(v)
    case int64:
        ret = v
    case float64:
        ret = int64(v)
    case string:
        if ret, err = strconv.ParseInt(v, 10, 64); err != nil {
            panic(err)
        }
    default:
        ret = 0
        //panic("type assertion to int64 failed")
    }
    return ret
}

func (j *Json) MustFloat64() float64 {
    var ret float64
    var err error
    switch v := j.data.(type) {
    case int:
        ret = float64(v)
    case int64:
        ret = float64(v)
    case float64:
        ret = v
    case string:
        v = strings.Replace(v, ",", "", -1)
        if ret, err = strconv.ParseFloat(v, 64); err != nil {
            panic(err)
        }
    default:
        ret = 0
        //panic("type assertion to float64 failed")
    }
    return ret
}

type iAssistantEx struct{
    accessKey      string
    secretKey      string
    currency       string
    opCurrency     string
    baseCurrency   string
    secret         string
    secretExpires  int64
    apiBase        string
    step           int64
    newRate        float64
    timeout        time.Duration
    timeLocation   *time.Location
}

type MapSorter []Item

type Item struct {
    Key string
    Val string
}

func NewMapSorter(m map[string]string) MapSorter {
    ms := make(MapSorter, 0, len(m))

    for k, v := range m {
        if strings.HasPrefix(k, "!") {
            k = strings.Replace(k, "!", "", -1)
        }
        ms = append(ms, Item{k, v})
    }

    return ms
}

func (ms MapSorter) Len() int {
    return len(ms)
}

func (ms MapSorter) Less(i, j int) bool {
    //return ms[i].Val < ms[j].Val // 按值排序
    return ms[i].Key < ms[j].Key   // 按键排序
}

func (ms MapSorter) Swap(i, j int) {
    ms[i], ms[j] = ms[j], ms[i]
}

func encodeParams(params map[string]string, onlyPlusKeyAndValue bool) string {
    ms := NewMapSorter(params)
    sort.Sort(ms)

    v := url.Values{}
    for _, item := range ms {
        v.Add(item.Key, item.Val)
    }
    if onlyPlusKeyAndValue {
        var strKeyAndValue string
        for _, kv := range ms {
            // v.Add(item.Key, item.Val)
            strKeyAndValue += (kv.Key + kv.Val)
        }   
        return strKeyAndValue
    }
    var buf bytes.Buffer
    keys := make([]string, 0, len(v))
    for k := range v {
        keys = append(keys, k)
    }
    sort.Strings(keys)
    for _, k := range keys {
        vs := v[k]
        prefix := k + "="
        for _, v := range vs {
            if buf.Len() > 0 {
                buf.WriteByte('&')
            }
            buf.WriteString(prefix)
            buf.WriteString(v)
        }
    }
    return buf.String()
}

func newAssistantEx(accessKey, secretKey string) *iAssistantEx {
    s := new(iAssistantEx)
    s.accessKey = accessKey
    s.secretKey = secretKey
    s.apiBase = "https://www.coinbig.com"                 

    s.timeout = 20 * time.Second
    s.timeLocation = time.FixedZone("Asia/Shanghai", 8*60*60)

    return s
}

func (p *iAssistantEx) apiCall(method string, httpType string) (*Json, error) {
    // 不用签名
    req, err := http.NewRequest(httpType, p.apiBase + method, nil)
    
    // fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B)    // 输出颜色
    fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "apiCall create req:" + method, 0x1B)
    fmt.Println("httpType:", httpType, "req:", req)          // 测试 输出 http 请求创建

    if err != nil {
        return nil, err
    }
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
    
    resp, err := httpClient.Do(req)               
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }

    js, err := NewJson(b)

    // 容错

    return js, err
}

/* 签名的函数,如果需要 访问签名接口时使用。
func (p *iAssistantEx) tapiCall(method string, params map[string]string, httpType string) (js *Json, err error) {
    if params == nil {
        params = map[string]string{}
    }
    params["apikey"] = p.accessKey
    params["time"] = strconv.FormatInt(time.Now().UnixNano() / 1e6, 10)            // 获取 时间戳

    h := md5.New()
    h.Write([]byte(encodeParams(params, false) + "&secret_key=" + p.secretKey))
    params["sign"] = strings.ToUpper(hex.EncodeToString(h.Sum(nil)))
    qs := encodeParams(params, false)

    req, err := http.NewRequest(httpType, fmt.Sprintf("%s%s?%s", p.apiBase, method, qs), nil)
    
    // 测试
    // fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B)    // 输出颜色
    fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "tapiCall create req:" + method, 0x1B)
    fmt.Println("httpType:", httpType, "params:", params, "req:", req)          // 测试 输出 http 请求创建

    if err != nil {
        return nil, err
    }
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

    resp, err := httpClient.Do(req)                              // 使用全局 的自定义的 httpClient , HTTP 对象
    if err != nil {
        return nil, err
    }

    defer resp.Body.Close()
    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }
    js, err = NewJson(b)

    // 容错

    return js, err
}
*/

type RpcRequest struct {        // 结构体里的字段首字母必须大写,否则无法正常解析,结构体有导出和未导出,大写字母开头为导出。
                                // 在Unmarshal的时候会  根据 json 匹配查找该结构体的tag, 所以此处需要修饰符
    AccessKey string            `json:"access_key"`
    SecretKey string            `json:"secret_key"`
    Nonce     int64             `json:"nonce"`
    Method    string            `json:"method"`
    Params    map[string]string `json:"params"`
}

func Produce(p chan<- interface{}, ctx context.Context, method string){
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Produce is finished!")   // 测试
            return
        default: 
            js, err := apiCall(method, "GET")
            if err != nil {
                p <- map[string]interface{}{
                    "error" : fmt.Sprintf("%v", err),
                }
            } else {
                p <- js.(*Json).MustMap()
            }
            time.Sleep(time.Second * 1)
        }
    }
}

func Consumer(c <-chan interface{}, ctx context.Context){
    for {
        select {
        case <-ctx.Done():
            fmt.Println("Consumer is finished!")   // 测试
            return
        default:
            ticker = <- c
            time.Sleep(time.Second * 1)
        }
    }
}

func apiCall(method string, httpType string) (interface{}, error) {
    // 不用签名
    req, err := http.NewRequest(httpType, method, nil)
    
    // fmt.Printf("\n %c[1;40;32m%s%c[0m\n\n", 0x1B, "testPrintColor", 0x1B)    // 输出颜色
    fmt.Printf("\n %c[1;44;32m%s%c[0m\n", 0x1B, "apiCall create req:" + method, 0x1B)
    fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "httpType:", httpType, "req:", req)          // 测试 输出 http 请求创建

    if err != nil {
        return nil, err
    }
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
    
    resp, err := httpClient.Do(req)               
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    b, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        return nil, err
    }

    js, err := NewJson(b)

    // 容错

    return js, err
}

var ticker interface{}
var _Cancel interface{}

func OnPost(w http.ResponseWriter, r *http.Request) {
    var ret interface{}
    defer func() {
        if e := recover(); e != nil {
            if ee, ok := e.(error); ok {
                e = ee.Error()
            }
            ret = map[string]string{"error": fmt.Sprintf("%v", e)}
        }
        b, _ := json.Marshal(ret)
        w.Write(b)
    }()

    b, err := ioutil.ReadAll(r.Body)
    if err != nil {
        panic(err)
    }
    var request RpcRequest
    err = json.Unmarshal(b, &request)
    if err != nil {
        panic(err)
    }

    // request.Params      参数
    // request.Method[6:]  完整URL   , 调用时传入  type=create  创建 , type=get 获取 数据 , type=finished 终止 并发

    var data interface{}
    if strings.HasPrefix(request.Method, "__api_") {
        // 处理 请求
        url := request.Method[6:]
        cmdType := request.Params["type"]

        if cmdType == "get" {
            data = ticker
            fmt.Println("get ticker:", ticker)
        } else if cmdType == "create" {
            ch := make(chan interface{}, 1)
            ctx, cancel := context.WithCancel(context.Background())
            go Produce(ch, ctx, url)
            go Consumer(ch, ctx)
            _Cancel = cancel
            panic(errors.New("创建 Produce , Consumer"))
        } else if cmdType == "finished" {
            // 结束 并发
            _Cancel.(context.CancelFunc)()
            panic(errors.New("销毁 Produce , Consumer"))
        } else {
            // 其他 exchange.IO 请求
        }
    } else {
        panic(errors.New(request.Method + " not support"))
    }

    if err != nil {
        panic(err)
    }
    ret = map[string]interface{}{
        "data": data,
    }

    return
}

func main() {
    var addr = flag.String("b", "127.0.0.1:6666", "bind addr")
    flag.Parse()
    if *addr == "" {
        flag.Usage()
        return
    }
    basePath := "/exchange"
    log.Println("Running ", fmt.Sprintf("http://%s%s", *addr, basePath), "...")
    http.HandleFunc(basePath, OnPost)
    http.ListenAndServe(*addr, nil)
}
  • 测试策略
function main() {
    var ret = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=create")
    
    Log(ret)
    var i = 0
    while(1){
        var beginTime = new Date().getTime()
        var ret1 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=get")
        var endTime = new Date().getTime()
        Log("exchange.IO 耗时(毫秒):", endTime - beginTime, " 第:", i, "次, ret1:", ret1)
        if(i == 30){
            var ret2 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=finished")
            Log("ret2:", ret2)
        }
        Sleep(5000)
        i++
    }
}

img

插件 使用 和一般的 通用协议 一样。
用插件 代码编译个可执行程序 和托管者一起运行就可以了。

测试用的是  OCX 交易所的  ticker 接口: https://openapi.ocx.com/api/v2/tickers/btcusdt

- 创建并发程序: 
  var ret = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=create")

- 获取 并发程序 实时访问的数据: 
  var ret1 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=get")
  
- 结束并发程序:
  var ret2 = exchange.IO("api", "POST", "https://openapi.ocx.com/api/v2/tickers/btcusdt", "type=finished")

仅仅玩一下,如有错误欢迎提出,还需学习。


More