С быстрым развитием финансовых рынков и популярностью количественной торговли все больше трейдеров начинают полагаться на автоматизированные стратегии торговли. В этом процессе особенно важны коммуникация и координация между стратегиями. FMZ (платформа количественной торговли) помогает трейдерам добиться бесшовной интеграции стратегий и обмена данными в реальном времени, предоставляя эффективный протокол связи между реальными торговыми стратегиями.
В этой статье подробно рассматривается протокол обмена данными в реальном времени для торговых стратегий на платформе FMZ, а также представлена концепция его дизайна, функциональные особенности и преимущества в практическом применении. С помощью подробного анализа случаев мы покажем, как использовать этот протокол для достижения эффективной и стабильной стратегической коммуникации, а также улучшения исполнения и прибыльности торговых стратегий.
Независимо от того, являетесь ли вы энтузиастом количественной торговли, который только начинает работать с FMZ, или опытным профессиональным программистом, эта статья предоставит вам ценную информацию и практические руководства по работе. Давайте рассмотрим мощные функции FMZ и узнаем, как добиться взаимодействия между стратегиями с помощью эффективных протоколов связи, повысить эффективность торговли и использовать рыночные возможности.
Сценарий спроса
-
- Многостратегическая совместная торговля
Сценарий спроса:
В сложной рыночной среде одна стратегия может оказаться неспособной справиться с различными чрезвычайными ситуациями и изменениями рынка. Трейдеры хотят одновременно использовать несколько стратегий, например, стратегии следования за трендом, стратегии возврата к среднему значению и арбитражные стратегии, и позволить этим стратегиям взаимодействовать в режиме реального времени для обмена рыночной информацией и торговыми сигналами, тем самым повышая общую эффективность и стабильность торговли.
- Многостратегическая совместная торговля
-
- Межрыночный арбитраж
Сценарий спроса:
Трейдеры хотят проводить арбитражные операции между различными торговыми рынками. Например, арбитраж можно осуществлять, используя разницу в ценах между рынком акций класса А и фондовым рынком Гонконга. Когда на определенном рынке возникают аномальные цены, стратегия должна оперативно уведомлять стратегии на других рынках о необходимости выполнения соответствующих операций купли-продажи с целью использования арбитражных возможностей.
- Межрыночный арбитраж
-
- Управление рисками и хеджирование
Сценарий спроса:
Одна стратегия отвечает за поиск и исполнение высокорисковых и высокодоходных сделок на рынке, в то время как другая стратегия фокусируется на мониторинге общего риска и проведении операций хеджирования. Чтобы гарантировать, что во время высокорисковых транзакций не возникнут чрезмерные потери, обе стратегии должны взаимодействовать и обмениваться данными в режиме реального времени, чтобы можно было своевременно корректировать позиции и хеджировать риски.
- Управление рисками и хеджирование
-
- Распределенная торговая система
Сценарий спроса:
Крупные торговые организации надеются запустить распределенные торговые системы на нескольких физических серверах, чтобы повысить отказоустойчивость и производительность торговых систем. Стратегии на этих серверах должны синхронизировать данные и координировать операции посредством протоколов связи для обеспечения стабильной и эффективной работы всей торговой системы.
- Распределенная торговая система
-
- Мониторинг рынка и раннее оповещение
Сценарий спроса:
Стратегия отвечает за мониторинг динамики рынка в режиме реального времени. Когда на рынке происходят серьезные изменения (например, внезапное падение или скачок цен), стратегия должна быстро уведомить другие стратегии о необходимости предпринять соответствующие действия, например, закрыть позиции, корректировка позиций или добавление позиций для снижения рисков. Или использование торговой возможности.
- Мониторинг рынка и раннее оповещение
-
- Управление стратегией портфеля
Сценарий спроса:
Трейдеры используют портфель стратегий для управления инвестициями в различные классы активов, при этом каждая стратегия фокусируется на определенном классе активов (например, акции, облигации, фьючерсы и т. д.). Эти стратегии необходимо согласовывать и координировать для достижения общей оптимизации портфельных инвестиций и максимизации доходности.
- Управление стратегией портфеля
Эти сценарии спроса демонстрируют различные возможности и преимущества протокола связи в реальном времени торговой стратегии FMZ в практических приложениях. Благодаря эффективной коммуникации между стратегиями трейдеры могут лучше справляться со сложными рыночными условиями, оптимизировать торговые стратегии и повышать эффективность торговли и прибыль.
Протокол связи и функция набора номера пакета FMZ
Поняв требования к связи между реальными дисками, нам необходимо рассмотреть, как реализовать эти требования. Это не более чем то, что реальный случай А надеется обменяться информацией с реальным случаем Б, хотя требование кажется простым. Однако необходимо согласовать различные детали с использованием набора протоколов связи. FMZ инкапсулировал несколько популярных протоколов связи.
mqtt / nats / amqp / kafka
Архитектура коммуникаций
Архитектура коммуникации:
- Сервер (Прокси).
Для передачи сообщений между подписчиками и издателями должен быть запущен сервер протокола связи. Этот сервер может быть развернут локально на хостовой системе (для локальной связи с реальными дисками); он также может быть удаленной службой (для кросс-серверной связи с реальными дисками). - Клиенты (подписчики, издатели).
Стратегическую программу на FMZ можно понимать как клиента протокола связи. Стратегическая программа может быть издателем (pub) или подписчиком (sub).
Функция набора номера
При применении этих протоколов на платформе FMZ, это можно просто понимать как mqtt/nats/amqp/kafka. Эти протоколы интегрированы вDial()В функции используйтеDial()Функции используются для публикации и подписки сообщений. Эти опубликованные сообщения передаются (ретранслируются) на подписанный реальный диск через сервер протокола, поэтому сначала необходимо запустить сервер протокола. Для простоты демонстрации в следующих примерах мы используем различные развертывания образов серверов протоколов.
Раздел документации API функции набора номера: https://www.fmz.com/syntax-guide#fun_dial
Перед развертыванием образа Docker не забудьте сначала установить программное обеспечение Docker.
Далее давайте рассмотрим и попрактикуем приложения протокола связи, поддерживаемые FMZ.
Практика протокола связи в реальном времени платформы FMZ
MQTT-протокол
MQTT (Message Queuing Telemetry Transport) — это облегченный протокол передачи сообщений, который особенно подходит для сетевых сред с низкой пропускной способностью, большой задержкой или ненадежностью. Он был предложен Энди Стэнфорд-Кларком и Арленом Ниппером из IBM в 1999 году и позднее стал стандартом ISO (ISO/IEC PRF 20922).
Основные возможности протокола MQTT: режим публикации/подписки
- Публикация: Создатель сообщения отправляет сообщение в тему.
- Подписка: Потребитель сообщений подписывается на интересующую его тему, тем самым получая сообщения, опубликованные по этой теме.
- Брокер: MQTT использует брокер сообщений в качестве посредника для пересылки сообщений, обеспечивая разделение между издателями и подписчиками.
Публикация сообщений и подписка
Поскольку для развертывания прокси-сервера MQTT мы используем образ docker (образ eclipse-mosquitto) программного обеспечения, поддерживающего протокол MQTT, мы установили docker заранее и не будем вдаваться в подробности позже.
Перед запуском команды по развертыванию образа нам необходимо написать файл конфигурации прокси-сервераmosquitto.conf。
mosquitto.conf
# 配置端口号及远程访问IP
listener 1883 0.0.0.0
# 设置匿名访问
allow_anonymous true
Затем выполните команду развертывания:
终端
docker run --rm -p 1883:1883 -v ./mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
После запуска образа прокси-сервера отображается следующий экран:
终端
1723012640: mosquitto version 2.0.18 starting
1723012640: Config loaded from /mosquitto/config/mosquitto.conf.
1723012640: Opening ipv4 listen socket on port 1883.
1723012640: mosquitto version 2.0.18 running
Затем мы можем протестировать стратегию и применить ее на практике.
javascript
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("mqtt://127.0.0.1:1883?topic=test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Основное применение функции Dial в коде стратегии:
javascript
Dial("mqtt://127.0.0.1:1883?topic=test_topic")
Строковый параметр функции Dial начинается сmqtt://Это имя протокола, за которым следует адрес прослушивания и порт. За символом "?" следует название темы подписки/публикации. Название темы, протестированное здесь, следующее:test_topic。
Вышеуказанная стратегия публикует и подписывается на тему одновременно. Тест выполнения показан на рисунке:
Вы также можете использовать два реальных диска для подписки друг на друга и публикации информации о теме. Мы используем такой пример в разделе практики протокола NATS и не будем повторять этот метод в других протоколах.
Протокол NATS
Протокол NATS — это простой текстовый протокол в стиле публикации/подписки. Клиент подключается к gnatsd (серверу NATS) и взаимодействует с gnatsd. Связь основана на обычных сокетах TCP/IP и определяет очень небольшой набор операций. Новая строка указывает на завершение. В отличие от традиционных систем обмена сообщениями, использующих двоичные форматы сообщений, текстовый протокол NATS упрощает реализацию клиента и может быть легко реализован на различных языках программирования или языках сценариев.
Каждый протокол имеет свои особенности. Вы можете обратиться к конкретным документам и материалам, которые не будут здесь подробно рассматриваться.
Разверните сервер протокола NATS:
docker run --name nats --rm -p 4222:4222 -p 8222:8222 nats --http_port 8222 --auth admin
Эта команда docker автоматически загрузит и запустит образ nats. Порт 4222 — это порт, к которому клиент хочет получить доступ. После развертывания образа на порту 8222 будет открыт http-монитор.
终端
Listening for client connections on 0.0.0.0:4222
Server is ready
Образ сервера NATS запускается, прослушивая порт 4222.
Связь между локальными стратегиями устройств в режиме реального времени
Нам нужно создать две стратегии (реальная торговля), назовем их пока Стратегия А и Стратегия Б. Коды этих двух стратегий в основном одинаковы. Написано на Javascript — самом простом языке для использования на платформе FMZ.
-
Стратегия А
javascriptvar connPub = null var connSub = null function main() { var robotId = _G() Log("当前实盘robotId:", robotId) connPub = Dial("nats://[email protected]:4222?topic=pubRobotA") if (!connPub) { Log("通信失败!") return } connSub = Dial("nats://[email protected]:4222?topic=pubRobotB") if (!connSub) { Log("通信失败!") return } while (true) { connPub.write("robotA发布的消息,robotId: " + robotId + ", time:" + _D()) var msgRead = connSub.read(10000) if (msgRead) { Log("msgRead:", msgRead) } LogStatus(_D()) Sleep(10000) } } function onexit() { connPub.close() connSub.close() } -
Стратегия Б
javascriptvar connPub = null var connSub = null function main() { var robotId = _G() Log("当前实盘robotId:", robotId) connPub = Dial("nats://[email protected]:4222?topic=pubRobotB") if (!connPub) { Log("通信失败!") return } connSub = Dial("nats://[email protected]:4222?topic=pubRobotA") if (!connSub) { Log("通信失败!") return } while (true) { connPub.write("robotB发布的消息,robotId: " + robotId + ", time:" + _D()) var msgRead = connSub.read(10000) if (msgRead) { Log("msgRead:", msgRead) } LogStatus(_D()) Sleep(10000) } } function onexit() { connPub.close() connSub.close() }
Эти две стратегии по сути одинаковы, за исключением того, что они публикуют и подписываются друг на друга, а подписываемые темы, публикуемые темы и публикуемая информация различаются.
Возьмем в качестве примера стратегию Б:
-
- Использование
Dial()Функция создает объект сервера клиентского соединенияconnPub, используется для публикации тематических сообщений:
var connPub = Dial("nats://[email protected]:4222?topic=pubRobotB")
Строка параметров функции Dial начинается с
nats://Указывает, что для связи используется протокол NATS, затемadminЭто простая проверочная информация, которая устанавливается при развертывании образа Docker.auth admin, используйте символ «@» для разделения следующего содержимого, а затем адрес службы и порт127.0.0.1:4222и, наконец, тема публикации/подписки:topic=pubRobotBОбратите внимание, что между предыдущим адресом стоит символ «?». - Использование
-
- Использование
Dial()Функция создает объект сервера клиентского соединенияconnSub, используется для подписки на тематические сообщения:
var connSub = Dial("nats://[email protected]:4222?topic=pubRobotA")
Единственное отличие
topic=pubRobotAОтличается, потому что вам нужно подписаться на тему, где стратегия А отправляет информациюpubRobotA。 - Использование
Создание и использование объектов подписки и публикации подключений в стратегии А аналогичны описанным выше.
Таким образом, реализуется простой пример применения протокола NATS, в котором реальный диск A и реальный диск B подписываются и публикуют сообщения для связи друг с другом.
протокол amqp
Очередь протокола AMQP
В асинхронной коммуникации сообщение не достигнет получателя немедленно, а будет сохранено в контейнере. При выполнении определенных условий сообщение будет отправлено получателю контейнером. Этот контейнер является очередью сообщений. Для выполнения этой функции , обе стороны должны Контейнер и его компоненты соответствовать единым соглашениям и правилам. AMQP — это такой протокол. И отправитель, и получатель сообщений могут достичь асинхронной связи, соблюдая этот протокол. Этот протокол определяет формат сообщений и принцип их работы.
Каждый протокол имеет свои особенности. Вы можете обратиться к конкретным документам и материалам, которые не будут здесь подробно рассматриваться.
Разверните сервер протокола amqp:
docker run --rm --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=q -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3-management
При развертывании образа Docker он автоматически загрузится и развернется, а по завершении отобразится следующее:
终端
2024-08-06 09:02:46.248936+00:00 [info] <0.9.0> Time to start RabbitMQ: 15569 ms
После развертывания образа сервера напишите тестовый пример:
javascript
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("amqp://q:[email protected]:5672/?queue=robotA_Queue")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 读取
Log("read:", conn.read(1000), "#FF0000")
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
При использовании очереди протокола AMQP, обратите внимание, что опубликованные сообщения будут сохраняться в очереди. Например, давайте запустим пример кода выше один раз. В очередь будет записано 10 сообщений. Затем, когда мы запускаем его во второй раз, мы можем обнаружить, что при чтении информация, записанная в первый раз, будет прочитана снова. Как показано на рисунке:
Вы можете видеть, что два сообщения журнала, на которые указывают красные стрелки на снимке экрана, имеют несогласованное время. Причина в том, что красное сообщение — это то, которое было прочитано и записано в очередь, когда код стратегии был запущен в первый раз.
На основе этой функции могут быть удовлетворены некоторые требования. Например, после перезапуска стратегии записанные рыночные данные по-прежнему могут быть получены из очереди для расчета инициализации и других операций.
Протокол Кафки
Apache Kafka — это распределенное хранилище данных, оптимизированное для приема и обработки потоковых данных в режиме реального времени. Потоковые данные — это данные, которые непрерывно генерируются тысячами источников данных, часто отправляющих записи данных одновременно. Стриминговая платформа должна обрабатывать эти непрерывно текущие данные и обрабатывать их шаг за шагом.
Kafka предоставляет своим пользователям три основные функции:
- Публикация и подписка на потоки записей
- Эффективно хранить поток записей в том порядке, в котором они были созданы.
- Обработка потоков записей в реальном времени
Kafka в основном используется для создания конвейеров потоковых данных в реальном времени и приложений, которые адаптируются к потокам данных. Он объединяет возможности обмена сообщениями, хранения и потоковой обработки для хранения как исторических, так и данных в реальном времени.
Публикация сообщений и подписка
Разверните образ docker прокси-сервера Kafka:
终端
docker run --rm --name kafka-server --hostname kafka-server -p 9092:9092 -p 9093:9093 \
-e KAFKA_CFG_NODE_ID=0 \
-e KAFKA_CFG_PROCESS_ROLES=controller,broker \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-server:9093 \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
bitnami/kafka:latest
Проверьте, используя тестовый код:
javascript
var conn = null
function main() {
LogReset(1)
var robotId = _G()
Log("当前实盘robotId:", robotId)
conn = Dial("kafka://localhost:9092/test_topic")
if (!conn) {
Log("通信失败!")
return
}
for (var i = 0; i < 10; i++) {
// 写入
var msg = "i: " + i + ", testQueue, robotA, robotId: " + robotId + ", time:" + _D()
conn.write(msg)
Log("向testQueue写入消息:", msg)
// 读取
Log("read:", conn.read(1000), "#FF0000")
Sleep(1000)
}
}
function onexit() {
conn.close()
Log("关闭conn")
}
Давайте рассмотрим, как использовать протокол Kafka для публикации и подписки на сообщения в функции набора номера.
javascript
Dial("kafka://localhost:9092/test_topic")
Как и у некоторых других протоколов, первая часть — это имя протокола. Затем следуйте адресу прослушивания:localhost:9092. Затем используйте символ "/" в качестве разделителя, а затем тему подписки/публикации. Здесь тестовая тема установлена наtest_topic。
Результаты теста:
- 1







