接下来要做撮合服务,我们要先让 counter 提供订单查询服务。

不让撮合系统直接从数据库里获取,是出于设计上的考虑。这样撮合服务不需要知道数据如何存储,从本地的模拟测试 Actor ,到单节点连接数据库的简单结构,再到分布式数据和集群节点,无论下单系统如何演进,撮合对接时不需要修改。

这里需要写一些比较复杂的查询,例如给定一个order id,找出它的下一条,也就是比它更大的id中最小的一个。虽然理想状态下这个序列应该是连续的,但是实践中我们不能期待分布式系统永远可靠,每一个节点都应该考虑其它节点不可靠时的应对。同样指定id返回对应记录的查询,也要考虑没有匹配记录时的处理。

这种情况,无论 clojure.java.jdbc 还是 java 的 hibernate 体系,能提供的帮助都比较有限。我们还是要手工写一点儿查询。

我们应该接受一个事实,就是每一个软件开发人员,应该尽量保持自己的知识范围比工作中常用的要“大一圈”。尽管现在数据库访问越来越方便,作应用开发的程序员,特别是后端开发人员,还是应该了解一些 SQL 编写的知识,特别是不能局限于 MySQL 体系。同样道理,仅仅 MSSQL 或 Oracle 也是不行的,哪怕仅仅从实用角度考虑,掌握一种服务器型的关系数据库的 SQL ,和一种内存嵌入式关系型数据库(例如 SQLite )的 SQL ,也能够有效提升工作效率。

再例如,我并不是 JVM 专家,我的工作中也不会直接面对 JVM 的精细量化调节,但是十六年前我第一次接触 Java 项目,就是从逆向乙方的 java 版 sdk ,重新实现 c sharp 版本开始。我们不应该指望学习一次技术,就可以吃一辈子,也不应该寄希望于阻止团队前进来为自己保值。市场在竞争,在优胜劣汰,我亲眼见过安心睡觉五六年后,惊愕的发现自己因为错过了技术升级的机会,只能被淘汰的企业。企业可以破产,个人如何选择未来?机会最终还是会归于有准备的人。希望大家能够理解一件事,团队和个人的学习,都是非常重要的事情,我们都知道坚持锻炼是保持健康的根本措施,那么坚持学习也一样。

这里我们要在 order.clj 中加入查找“比给定 id 大的最小 id”所对应的记录,这个查询是:

with last as (select min(id) from order_flow) 
select order_flow.id, account_id, price, content 
from order_flow join last where order_flow.id=last.id;

对应的 Clojure 代码,用 jaskell.sql 工具写出来是:

(def find-last-query
  (-> (with [:last as
             (select (f :min :id) :as :id
                     from :order_flow
                     where :id :> (p 0))]
            select [:order_flow.id :content :price]
            from :order_flow
            join :last on :order_flow.id := :last.id)
      (.cache)))

现在我们可以写出 place-order 的逆操作 load-order :

(defmulti load-order (fn [data] (get-in data [:content "category"])))

(defmethod load-order "limit-ask" [data]
  (doto (LimitAsk.)
    (.setId (:id data))
    (.setPrice (:price data))
    (.setSymbol (get-in data [:content "symbol"]))
    (.setQuantity (get-in data [:content "quantity"]))
    (.setCompleted (get-in data [:content "completed"]))
    (.setAccountId (get-in data [:content "account-id"]))))

(defmethod load-order "limit-bid" [data]
  (doto (LimitBid.)
    (.setId (:id data))
    (.setPrice (:price data))
    (.setSymbol (get-in data [:content "symbol"]))
    (.setQuantity (get-in data [:content "quantity"]))
    (.setCompleted (get-in data [:content "completed"]))
    (.setAccountId (get-in data [:content "account-id"]))))

(defmethod load-order "market-ask" [data]
  (doto (MarketAsk.)
    (.setId (:id data))
    (.setSymbol (get-in data [:content "symbol"]))
    (.setQuantity (get-in data [:content "quantity"]))
    (.setCompleted (get-in data [:content "completed"]))
    (.setAccountId (get-in data [:content "account-id"]))))

(defmethod load-order "market-bid" [data]
  (doto (MarketBid.)
    (.setId (:id data))
    (.setSymbol (get-in data [:content "symbol"]))
    (.setQuantity (get-in data [:content "quantity"]))
    (.setCompleted (get-in data [:content "completed"]))
    (.setAccountId (get-in data [:content "account-id"]))))

(defmethod load-order "cancel" [data]
  (doto (Cancel.)
    (.setId (:id data))
    (.setSymbol (get-in data [:content "symbol"]))
    (.setAccountId (get-in data [:content "account-id"]))
    (.setOrderId (get-in data [:content "order-id"]))))

在保存数据的时候,我们是先 place-order 生成一致的 json 后执行 save 入库。读取数据则相反,根据 find-by 和 find-next 操作查询到的数据,通过 load-order 或 NoteMore/NotFound 的构造返回查询结果,那么 find-by 和 find-next 代码是:

(defn find-by
  [order-id]
  (if-some [data (j/get-by-id @db :order_flow order-id)]
    (load-order data)
    (doto (OrderNotFound.)
      (.setId order-id))))

(defn find-next
  [from-id]
  (let [data (j/query @db [(.script find-last-query) from-id])]
    (if (not-empty data)
      (load-order (first data))
      (doto (OrderNoMore.)
        (.setPositionId from-id)))))

有了这两个函数,我们就可以……且慢还是写写个测试吧,首先我们准备一些测试数据:

(ns liu.mars.market.test-data)

(def sym "btcusdt")

(def note-paper
  {:limit-ask  [{:id 1 :symbol sym :price 34522M :quantity 1 :account-id 3223421}
                {:id 2 :symbol sym :price 34512M :quantity 10001 :account-id 3223421}
                {:id 3 :symbol sym :price 34525M :quantity 10020 :account-id 34223421}
                {:id 4 :symbol sym :price 34562M :quantity 1000 :account-id 3422341}
                {:id 5 :symbol sym :price 44522M :quantity 10000 :account-id 34223421}]
   :limit-bid  [{:id 6 :symbol sym :price 24522M :quantity 1 :account-id 34223421}
                {:id 7 :symbol sym :price 3412M :quantity 10001 :account-id 34223421}
                {:id 8 :symbol sym :price 32525M :quantity 10020 :account-id 34223421}
                {:id 9 :symbol sym :price 31562M :quantity 1000 :account-id 34223421}
                {:id 10 :symbol sym :price 1522M :quantity 9999 :account-id 34223421}]
   :market-ask [{:id 11 :symbol sym :quantity 1 :account-id 34223421}
                {:id 12 :symbol sym :quantity 10001 :account-id 3422342}
                {:id 13 :symbol sym :quantity 10020 :account-id 34223421}
                {:id 14 :symbol sym :quantity 1000 :account-id 3423421}
                {:id 15 :symbol sym :quantity 10000 :account-id 34223421}]
   :market-bid [{:id 16 :symbol sym :quantity 12433 :account-id 34223421}
                {:id 17 :symbol sym :quantity 10001 :account-id 34223421}
                {:id 18 :symbol sym :quantity 10020 :account-id 34223421}
                {:id 19 :symbol sym :quantity 1000 :account-id 34223421}
                {:id 20 :symbol sym :quantity 9999 :account-id 3422421}]
   :cancel     [{:id 21 :symbol sym :account-id 4223421 :order-id 23341}
                {:id 22 :symbol sym :account-id 3423421 :order-id 23342}
                {:id 23 :symbol sym :account-id 3422321 :order-id 2341}
                {:id 24 :symbol sym :account-id 3423421 :order-id 23}
                {:id 25 :symbol sym :account-id 3423421 :order-id 9}]})

后面的测试会反复使用这组数据,接下来是测试代码:

(ns liu.mars.market.inner-find-test
  (:require [clojure.java.jdbc :as j])
  (:require [liu.mars.market.order :refer :all])
  (:require [clojure.test :refer :all])
  (:require [liu.mars.market.test-data :as data]
            [liu.mars.market.config :as cfg])
  (:import (liu.mars.market.messages LimitAsk LimitBid MarketAsk MarketBid)))

(testing "inner testing for order module"
  (j/delete! @cfg/db :order_flow ["id < ?" 26])
  (testing "tests for limit ask orders save and reload"
    (doseq [item (:limit-ask data/note-paper)]
      (let [data (assoc item :completed 0 :category "limit-ask")]
        (save data)
        (let [order (find-by (:id data))]
          (is (instance? LimitAsk order))
          (is (= (:id data) (.getId order)))
          (is (= 0 (:completed data) (.getCompleted order)))
          (is (= (:quantity data) (.getQuantity order)))
          (is (= (:account-id data) (.getAccountId order)))
          (is (= (:symbol data)) (.getSymbol order))
          (is (= (:price data) (.getPrice order)))))))
  (testing "tests for limit bid orders save and reload"
    (doseq [item (:limit-bid data/note-paper)]
      (let [data (assoc item :completed 0 :category "limit-bid")]
        (save data)
        (let [order (find-by (:id data))]
          (is (instance? LimitBid order))
          (is (= (:id data) (.getId order)))
          (is (= 0 (:completed data) (.getCompleted order)))
          (is (= (:quantity data) (.getQuantity order)))
          (is (= (:account-id data) (.getAccountId order)))
          (is (= (:symbol data)) (.getSymbol order))
          (is (= (:price data) (.getPrice order)))))))
  (testing "tests for market ask orders save and reload"
    (doseq [item (:market-ask data/note-paper)]
      (let [data (assoc item :completed 0 :category "market-ask")]
        (save data)
        (let [order (find-by (:id data))]
          (is (instance? MarketAsk order))
          (is (= (:id data) (.getId order)))
          (is (= 0 (:completed data) (.getCompleted order)))
          (is (= (:quantity data) (.getQuantity order)))
          (is (= (:account-id data) (.getAccountId order)))
          (is (= (:symbol data)) (.getSymbol order))))))
  (testing "tests for market bid orders save and reload"
    (doseq [item (:market-bid data/note-paper)]
      (let [data (assoc item :completed 0 :category "market-bid")]
        (save data)
        (let [order (find-by (:id data))]
          (is (instance? MarketBid order))
          (is (= (:id data) (.getId order)))
          (is (= 0 (:completed data) (.getCompleted order)))
          (is (= (:account-id data) (.getAccountId order)))
          (is (= (:quantity data) (.getQuantity order)))
          (is (= (:symbol data)) (.getSymbol order)))))))

然后我们定义一个新的 Actor 负责查询服务:

package liu.mars.market;

import akka.actor.AbstractActor;
import akka.actor.Props;
import clojure.lang.IFn;
import jaskell.util.CR;
import liu.mars.market.messages.FindOrder;
import liu.mars.market.messages.NextOrder;

public class PeekActor extends AbstractActor {
    private static final String order_namespace = "liu.mars.market.order";
    private IFn next;
    private IFn find;
    static  {
        CR.require(order_namespace);
    }

    public static Props props() {
        return Props.create(PeekActor.class, PeekActor::new);
    }

    private PeekActor() {
        this.find = CR.var(order_namespace, "find-by");
        this.next = CR.var(order_namespace, "find-next");
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(NextOrder.class, msg -> {
            sender().tell(next.invoke(msg.getPositionId()), self());
        }).match(FindOrder.class, msg -> {
            sender().tell(find.invoke(msg.getId()), self());
        }).build();
    }
}

对应的测试如下:

(ns liu.mars.market.local-find-test
  (:require [clojure.test :refer :all])
  (:require [clojure.java.jdbc :as j])
  (:require [liu.mars.market.test-data :as data])
  (:require [liu.mars.market.config :as cfg])
  (:require [liu.mars.market.order :as o])
  (:import (akka.actor ActorSystem)
           (akka.testkit.javadsl TestKit)
           (liu.mars.market PeekActor)
           (liu.mars.market.messages FindOrder LimitAsk LimitBid MarketAsk MarketBid NextOrder Cancel)
           (java.util.function Supplier Function)))

(testing "tests for find action by actor in local system"
  (j/delete! @cfg/db :order_flow ["id < ?" 26])
  (let [system (ActorSystem/create "test")
        test-kit (TestKit. system)
        await #(.awaitCond test-kit (reify Supplier (get [this] (.msgAvailable test-kit))))
        self (.getRef test-kit)
        actor (.actorOf system (PeekActor/props))]
    (doseq [item (:limit-ask data/note-paper)]
      (let [data (assoc item :completed 0 :category "limit-ask")
            message (doto (FindOrder.)
                      (.setId (:id item)))]
        (o/save data)
        (.tell actor message self)
        (await)
        (.expectMsgPF test-kit "should get limit ask order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? LimitAsk msg))
                          (is (= (:id item) (.getId msg)))
                          (is (= 0 (:completed data) (.getCompleted msg)))
                          (is (= (:quantity data) (.getQuantity msg)))
                          (is (= (:account-id data) (.getAccountId msg)))
                          (is (= (:symbol data)) (.getSymbol msg))
                          (is (= (:price data) (.getPrice msg))))))))
    (doseq [item (:limit-bid data/note-paper)]
      (let [data (assoc item :completed 0 :category "limit-bid")
            message (doto (FindOrder.)
                      (.setId (:id item)))]
        (o/save data)
        (.tell actor message self)
        (await)
        (.expectMsgPF test-kit "should get limit bid order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? LimitBid msg))
                          (is (= (:id item) (.getId msg)))
                          (is (= 0 (:completed data) (.getCompleted msg)))
                          (is (= (:quantity data) (.getQuantity msg)))
                          (is (= (:symbol data)) (.getSymbol msg))
                          (is (= (:account-id data) (.getAccountId msg)))
                          (is (= (:price data) (.getPrice msg))))))))
    (doseq [item (:market-ask data/note-paper)]
      (let [data (assoc item :completed 0 :category "market-ask")
            message (doto (FindOrder.)
                      (.setId (:id item)))]
        (o/save data)
        (.tell actor message self)
        (await)
        (.expectMsgPF test-kit "should get market ask order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? MarketAsk msg))
                          (is (= (:id item) (.getId msg)))
                          (is (= 0 (:completed data) (.getCompleted msg)))
                          (is (= (:quantity data) (.getQuantity msg)))
                          (is (= (:account-id data) (.getAccountId msg)))
                          (is (= (:symbol data)) (.getSymbol msg)))))))
    (doseq [item (:market-bid data/note-paper)]
      (let [data (assoc item :completed 0 :category "market-bid")
            message (doto (FindOrder.)
                      (.setId (:id item)))]
        (o/save data)
        (.tell actor message self)
        (await)
        (.expectMsgPF test-kit "should get market bid order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? MarketBid msg))
                          (is (= (:id item) (.getId msg)))
                          (is (= 0 (:completed data) (.getCompleted msg)))
                          (is (= (:account-id data) (.getAccountId msg)))
                          (is (= (:quantity data) (.getQuantity msg)))
                          (is (= (:symbol data)) (.getSymbol msg)))))))
    (doseq [item (:cancel data/note-paper)]
      (let [data (assoc item :completed 0 :category "cancel")
            message (doto (FindOrder.)
                      (.setId (:id item)))]
        (o/save data)
        (.tell actor message self)
        (await)
        (.expectMsgPF test-kit "should get cancel order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? Cancel msg))
                          (is (= (:id item) (.getId msg)))
                          (is (= (:account-id data) (.getAccountId msg)))
                          (is (= :order-id) (.getOrderId msg))
                          (is (= (:symbol data)) (.getSymbol msg)))))))
    (TestKit/shutdownActorSystem system)))

(testing "tests for find next action by actor in local system"
  (j/delete! @cfg/db :order_flow ["id < ?" 26])
  (let [system (ActorSystem/create "test")
        test-kit (TestKit. system)
        await #(.awaitCond test-kit (reify Supplier (get [this] (.msgAvailable test-kit))))
        self (.getRef test-kit)
        actor (.actorOf system (PeekActor/props))]
    (doseq [item (:limit-ask data/note-paper)]
      (let [data (assoc item :completed 0 :category "limit-ask")
            message (doto (NextOrder.)
                      (.setPositionId (- (:id item) 1)))]
        (o/save data)
        (.tell actor message self)
        (await)
        (.expectMsgPF test-kit "should get limit ask order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? LimitAsk msg))
                          (is (= (:id item) (.getId msg)))
                          (is (= 0 (:completed data) (.getCompleted msg)))
                          (is (= (:account-id data) (.getAccountId msg)))
                          (is (= (:quantity data) (.getQuantity msg)))
                          (is (= (:symbol data)) (.getSymbol msg))
                          (is (= (:price data) (.getPrice msg))))))))
    (doseq [item (:limit-bid data/note-paper)]
      (let [data (assoc item :completed 0 :category "limit-bid")
            message (doto (NextOrder.)
                      (.setPositionId (- (:id item) 1)))]
        (o/save data)
        (.tell actor message self)
        (await)
        (.expectMsgPF test-kit "should get limit bid order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? LimitBid msg))
                          (is (= (:id item) (.getId msg)))
                          (is (= 0 (:completed data) (.getCompleted msg)))
                          (is (= (:account-id data) (.getAccountId msg)))
                          (is (= (:quantity data) (.getQuantity msg)))
                          (is (= (:symbol data)) (.getSymbol msg))
                          (is (= (:price data) (.getPrice msg))))))))
    (doseq [item (:market-ask data/note-paper)]
      (let [data (assoc item :completed 0 :category "market-ask")
            message (doto (NextOrder.)
                      (.setPositionId (- (:id item) 1)))]
        (o/save data)
        (.tell actor message self)
        (await)
        (.expectMsgPF test-kit "should get market ask order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? MarketAsk msg))
                          (is (= (:id item) (.getId msg)))
                          (is (= 0 (:completed data) (.getCompleted msg)))
                          (is (= (:account-id data) (.getAccountId msg)))
                          (is (= (:quantity data) (.getQuantity msg)))
                          (is (= (:symbol data)) (.getSymbol msg)))))))
    (doseq [item (:market-bid data/note-paper)]
      (let [data (assoc item :completed 0 :category "market-bid")
            message (doto (NextOrder.)
                      (.setPositionId (- (:id item) 1)))]
        (o/save data)
        (.tell actor message self)
        (await)
        (.expectMsgPF test-kit "should get market bid order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? MarketBid msg))
                          (is (= (:id item) (.getId msg)))
                          (is (= (:account-id data) (.getAccountId msg)))
                          (is (= 0 (:completed data) (.getCompleted msg)))
                          (is (= (:quantity data) (.getQuantity msg)))
                          (is (= (:symbol data)) (.getSymbol msg)))))))
    (doseq [item (:cancel data/note-paper)]
      (let [data (assoc item :completed 0 :category "cancel")
            message (doto (NextOrder.)
                      (.setPositionId (- (:id item) 1)))]
        (o/save data)
        (.tell actor message self)
        (await)
        (.expectMsgPF test-kit "should get cancel order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? Cancel msg))
                          (is (= (:id item) (.getId msg)))
                          (is (= (:account-id data) (.getAccountId msg)))
                          (is (= :order-id) (.getOrderId msg))
                          (is (= (:symbol data)) (.getSymbol msg)))))))
    (TestKit/shutdownActorSystem system)))

接下来我们修改入口程序,让 counter 服务可以同时提供下单和查询:

package liu.mars.market;

import akka.actor.ActorSystem;
import jaskell.util.CR;

import java.io.IOException;

public class CounterApp {
    public static void main(String[] args) throws IOException {
        String config_namespace = "liu.mars.market.config";
        CR.require(config_namespace);
        String path = (String)CR.var(config_namespace, "sequences").get();
        ActorSystem system = ActorSystem.create("counter");
        var placeRef = system.actorOf(PlaceActor.props(path), "place");
        var peekRef = system.actorOf(PeekActor.props(), "peek");
        system.registerOnTermination(()->{
            system.stop(placeRef);
            system.stop(peekRef);
        });
        System.out.println("Ctrl+c if want to stop");
    }
}

现在我们可以编写一组,通过网络访问在运行的 counter ,测试下单到查询的整个生命期。为此我们定义一组用于开发机环境的profile,修改 project.clj 如下:

(defproject counter "0.3"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url  "http://www.eclipse.org/legal/epl-v10.html"}
  :plugins [[lein-junit "1.1.8"]]
  :source-paths ["src/main/clojure"]
  :java-source-paths ["src/main/java"]
  :dependencies [[org.clojure/clojure "1.10.0"]
                 [com.typesafe.akka/akka-actor_2.12 "2.5.19"]
                 [com.typesafe.akka/akka-remote_2.12 "2.5.19"]
                 [liu.mars/jaskell "0.1.2"]
                 [liu.mars/market-messages "0.2"]
                 [org.postgresql/postgresql "42.2.5"]
                 [clj-postgresql "0.7.0"]
                 [org.clojure/java.jdbc "0.7.8"]
                 [com.fasterxml.jackson.core/jackson-core "2.9.6"]
                 [com.fasterxml.jackson.core/jackson-databind "2.9.6"]
                 [com.github.romix.akka/akka-kryo-serialization_2.12 "0.5.2"]]
  :test-paths ["src/test/clojure" "src/test/java"]
  :resource-paths ["resources/main"]
  :junit ["src/test/java"]
  :aot :all
  :uberjar-merge-with {#".properties$" [slurp str spit] "reference.conf" [slurp str spit]}
  :profiles {:server  {:main           liu.mars.market.CounterApp
                       :jvm-opts       ["-Dconfig.resource=server.conf"]
                       :resource-paths ["resources/server"]}
             :local   {:main           liu.mars.market.CounterApp
                       :jvm-opts       ["-Dconfig.resource=server.conf"]
                       :resource-paths ["resources/local"]}
             :test    {:dependencies      [[junit/junit "4.12"]
                                           [com.typesafe.akka/akka-testkit_2.12 "2.5.19"]]
                       :resource-paths    ["resources/test"]
                       :java-source-paths ["src/test/java"]
                       :jvm-opts          ["-Dconfig.resource=test.conf"]}
             :dev     {:resource-paths ["resources/dev"]
                       :source-paths   ["src/notebook"]
                       :jvm-opts       ["-Dconfig.resource=dev.conf"]}
             :gorilla {:plugins [[org.clojars.benfb/lein-gorilla "0.5.3"]]}})

对应的 config.edn:

{:db-spec   {:dbtype   "postgresql"
             :dbname   "counter"}
 :sequences "akka.tcp://sequences@192.168.50.22:25520/user/sequences"}

和 server.conf:

akka {
  extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$"]

  actor {
    provider = remote

    serializers {
      java = "akka.serialization.JavaSerializer"
      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
    }

    serialization-bindings {
      "liu.mars.market.messages.CreateSequence" = kryo
      "liu.mars.market.messages.DropSequence" = kryo
      "liu.mars.market.messages.ListSequences" = kryo
      "liu.mars.market.messages.NextValue" = kryo
      "liu.mars.market.messages.LimitAsk" = kryo
      "liu.mars.market.messages.LimitBid" = kryo
      "liu.mars.market.messages.MarketAsk" = kryo
      "liu.mars.market.messages.MarketBid" = kryo
      "liu.mars.market.messages.Cancel" = kryo
      "liu.mars.market.messages.FindOrder" = kryo
      "liu.mars.market.messages.NextOrder" = kryo
      "liu.mars.market.messages.OrderNotFound" = kryo
      "liu.mars.market.messages.OrderNoMore" = kryo
      "com.fasterxml.jackson.databind.node.ObjectNode" = kryo
      "com.fasterxml.jackson.databind.node.ArrayNode" = kryo
      "clojure.lang.PersistentArrayMap" = kryo
      "clojure.lang.PersistentList" = kryo
      "clojure.lang.PersistentVector" = kryo
      "clojure.lang.LazySeq" = kryo
      "clojure.lang.Keyword" = kryo
      "clojure.lang.Symbol" = kryo
      "java.util.ArrayList" = kryo
    }

    kryo  {
      type = "graph"
      idstrategy = "incremental"
      buffer-size = 4096
      max-buffer-size = -1
      kryo-custom-serializer-init = "liu.mars.market.KryoInit"
      kryo-trace = false
    }

  }

  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 25530
    }
  }

}

需要注意的是这里 kryo 的配置也做了扩展,其它config中也一样要扩展,加入新功能设计的类型。不一一赘述了。

先启动服务:lein with-profile +local run 。然后运行以下测试:

(ns liu.mars.market.local-remote-test
  "run with local profile, place run `lein with-profile +local run` prepare the server in local"
  (:require [clojure.test :refer :all])
  (:require [clojure.java.jdbc :as j])
  (:require [liu.mars.market.test-data :as data])
  (:require [liu.mars.market.config :as cfg])
  (:require [liu.mars.market.order :as o])
  (:import (akka.actor ActorSystem)
           (akka.testkit.javadsl TestKit)
           (liu.mars.market.messages FindOrder LimitAsk LimitBid MarketAsk MarketBid NextOrder Cancel)
           (java.util.function Supplier Function)))

(testing "tests for place and find actions by actor"
  (let [system (ActorSystem/create "test")
        test-kit (TestKit. system)
        await #(.awaitCond test-kit (reify Supplier (get [this] (.msgAvailable test-kit))))
        self (.getRef test-kit)
        host "127.0.0.1"
        place-actor (.actorSelection system (str "akka.tcp://counter@" host ":25530/user/place"))
        peek-actor (.actorSelection system (str "akka.tcp://counter@" host ":25530/user/peek"))]
    (doseq [item (:limit-ask data/note-paper)]
      (let [post (doto (LimitAsk.)
                   (.setPrice (:price item))
                   (.setQuantity (:quantity item))
                   (.setAccountId (:account-id item))
                   (.setSymbol (:symbol item)))
            get  (FindOrder.)]
        (.tell place-actor post self)
        (await)
        (.expectMsgPF test-kit "should get new order id from place actor"
                      (reify Function
                        (apply [this msg]
                          (is (instance? Long msg))
                          (.setId get msg))))
        (.tell peek-actor get self)
        (await)
        (.expectMsgPF test-kit "should get limit ask order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? LimitAsk msg))
                          (is (= (.getId get) (.getId msg)))
                          (is (= 0 (.getCompleted msg)))
                          (is (= (:quantity item) (.getQuantity msg)))
                          (is (= (:account-id item) (.getAccountId msg)))
                          (is (= (:symbol item)) (.getSymbol msg))
                          (is (= (:price item) (.getPrice msg))))))))
    (doseq [item (:limit-bid data/note-paper)]
      (let [post (doto (LimitBid.)
                   (.setPrice (:price item))
                   (.setQuantity (:quantity item))
                   (.setAccountId (:account-id item))
                   (.setSymbol (:symbol item)))
            get  (FindOrder.)]
        (.tell place-actor post self)
        (await)
        (.expectMsgPF test-kit "should get new order id from place actor"
                      (reify Function
                        (apply [this msg]
                          (is (instance? Long msg))
                          (.setId get msg))))
        (.tell peek-actor get self)
        (await)
        (.expectMsgPF test-kit "should get limit ask order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? LimitBid msg))
                          (is (= (.getId get) (.getId msg)))
                          (is (= 0 (.getCompleted msg)))
                          (is (= (:quantity item) (.getQuantity msg)))
                          (is (= (:account-id item) (.getAccountId msg)))
                          (is (= (:symbol item)) (.getSymbol msg))
                          (is (= (:price item) (.getPrice msg))))))))
    (doseq [item (:market-ask data/note-paper)]
      (let [post (doto (MarketAsk.)
                   (.setQuantity (:quantity item))
                   (.setAccountId (:account-id item))
                   (.setSymbol (:symbol item)))
            get  (FindOrder.)]
        (.tell place-actor post self)
        (await)
        (.expectMsgPF test-kit "should get new order id from place actor"
                      (reify Function
                        (apply [this msg]
                          (is (instance? Long msg))
                          (.setId get msg))))
        (.tell peek-actor get self)
        (await)
        (.expectMsgPF test-kit "should get limit ask order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? MarketAsk msg))
                          (is (= (.getId get) (.getId msg)))
                          (is (= 0 (.getCompleted msg)))
                          (is (= (:quantity item) (.getQuantity msg)))
                          (is (= (:account-id item) (.getAccountId msg)))
                          (is (= (:symbol item)) (.getSymbol msg)))))))
    (doseq [item (:market-bid data/note-paper)]
      (let [post (doto (MarketBid.)
                   (.setQuantity (:quantity item))
                   (.setAccountId (:account-id item))
                   (.setSymbol (:symbol item)))
            get  (FindOrder.)]
        (.tell place-actor post self)
        (await)
        (.expectMsgPF test-kit "should get new order id from place actor"
                      (reify Function
                        (apply [this msg]
                          (is (instance? Long msg))
                          (.setId get msg))))
        (.tell peek-actor get self)
        (await)
        (.expectMsgPF test-kit "should get limit ask order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? MarketBid msg))
                          (is (= (.getId get) (.getId msg)))
                          (is (= 0 (.getCompleted msg)))
                          (is (= (:quantity item) (.getQuantity msg)))
                          (is (= (:account-id item) (.getAccountId msg)))
                          (is (= (:symbol item)) (.getSymbol msg)))))))
    (doseq [item (:cancel data/note-paper)]
      (let [post (doto (Cancel.)
                   (.setAccountId (:account-id item))
                   (.setSymbol (:symbol item))
                   (.setOrderId (:order-id item)))
            get  (FindOrder.)]
        (.tell place-actor post self)
        (await)
        (.expectMsgPF test-kit "should get new order id from place actor"
                      (reify Function
                        (apply [this msg]
                          (is (instance? Long msg))
                          (.setId get msg))))
        (.tell peek-actor get self)
        (await)
        (.expectMsgPF test-kit "should get cancel order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? Cancel msg))
                          (is (= (.getId get) (.getId msg)))
                          (is (= (:order-id item) (.getOrderId msg)))
                          (is (= (:account-id item) (.getAccountId msg)))
                          (is (= (:symbol item)) (.getSymbol msg)))))))

    (doseq [item (:limit-ask data/note-paper)]
      (let [post (doto (LimitAsk.)
                   (.setPrice (:price item))
                   (.setQuantity (:quantity item))
                   (.setAccountId (:account-id item))
                   (.setSymbol (:symbol item)))
            get  (NextOrder.)]
        (.tell place-actor post self)
        (await)
        (.expectMsgPF test-kit "should get new order id from place actor"
                      (reify Function
                        (apply [this msg]
                          (is (instance? Long msg))
                          (.setPositionId get (- msg 1)))))
        (.tell peek-actor get self)
        (await)
        (.expectMsgPF test-kit "should get limit ask order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? LimitAsk msg))
                          (is (= (+ (.getPositionId get) 1) (.getId msg)))
                          (is (= 0 (.getCompleted msg)))
                          (is (= (:quantity item) (.getQuantity msg)))
                          (is (= (:account-id item) (.getAccountId msg)))
                          (is (= (:symbol item)) (.getSymbol msg))
                          (is (= (:price item) (.getPrice msg))))))))
    (doseq [item (:limit-bid data/note-paper)]
      (let [post (doto (LimitBid.)
                   (.setPrice (:price item))
                   (.setQuantity (:quantity item))
                   (.setAccountId (:account-id item))
                   (.setSymbol (:symbol item)))
            get  (NextOrder.)]
        (.tell place-actor post self)
        (await)
        (.expectMsgPF test-kit "should get new order id from place actor"
                      (reify Function
                        (apply [this msg]
                          (is (instance? Long msg))
                          (.setPositionId get (- msg 1)))))
        (.tell peek-actor get self)
        (await)
        (.expectMsgPF test-kit "should get limit ask order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? LimitBid msg))
                          (is (= (inc (.getPositionId get)) (.getId msg)))
                          (is (= 0 (.getCompleted msg)))
                          (is (= (:quantity item) (.getQuantity msg)))
                          (is (= (:account-id item) (.getAccountId msg)))
                          (is (= (:symbol item)) (.getSymbol msg))
                          (is (= (:price item) (.getPrice msg))))))))
    (doseq [item (:market-ask data/note-paper)]
      (let [post (doto (MarketAsk.)
                   (.setQuantity (:quantity item))
                   (.setAccountId (:account-id item))
                   (.setSymbol (:symbol item)))
            get  (NextOrder.)]
        (.tell place-actor post self)
        (await)
        (.expectMsgPF test-kit "should get new order id from place actor"
                      (reify Function
                        (apply [this msg]
                          (is (instance? Long msg))
                          (.setPositionId get (- msg 1)))))
        (.tell peek-actor get self)
        (await)
        (.expectMsgPF test-kit "should get limit ask order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? MarketAsk msg))
                          (is (= (+ (.getPositionId get) 1) (.getId msg)))
                          (is (= 0 (.getCompleted msg)))
                          (is (= (:quantity item) (.getQuantity msg)))
                          (is (= (:account-id item) (.getAccountId msg)))
                          (is (= (:symbol item)) (.getSymbol msg)))))))
    (doseq [item (:market-bid data/note-paper)]
      (let [post (doto (MarketBid.)
                   (.setQuantity (:quantity item))
                   (.setAccountId (:account-id item))
                   (.setSymbol (:symbol item)))
            get  (NextOrder.)]
        (.tell place-actor post self)
        (await)
        (.expectMsgPF test-kit "should get new order id from place actor"
                      (reify Function
                        (apply [this msg]
                          (is (instance? Long msg))
                          (.setPositionId get (- msg 1)))))
        (.tell peek-actor get self)
        (await)
        (.expectMsgPF test-kit "should get limit ask order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? MarketBid msg))
                          (is (= (+ (.getPositionId get) 1) (.getId msg)))
                          (is (= 0 (.getCompleted msg)))
                          (is (= (:quantity item) (.getQuantity msg)))
                          (is (= (:account-id item) (.getAccountId msg)))
                          (is (= (:symbol item)) (.getSymbol msg)))))))
    (doseq [item (:cancel data/note-paper)]
      (let [post (doto (Cancel.)
                   (.setAccountId (:account-id item))
                   (.setSymbol (:symbol item))
                   (.setOrderId (:order-id item)))
            get  (NextOrder.)]
        (.tell place-actor post self)
        (await)
        (.expectMsgPF test-kit "should get new order id from place actor"
                      (reify Function
                        (apply [this msg]
                          (is (instance? Long msg))
                          (.setPositionId get (- msg 1)))))
        (.tell peek-actor get self)
        (await)
        (.expectMsgPF test-kit "should get cancel order as save"
                      (reify Function
                        (apply [this msg]
                          (is (instance? Cancel msg))
                          (is (= (inc (.getPositionId get)) (.getId msg)))
                          (is (= (:order-id item) (.getOrderId msg)))
                          (is (= (:account-id item) (.getAccountId msg)))
                          (is (= (:symbol item)) (.getSymbol msg)))))))
    (TestKit/shutdownActorSystem system)))

显然只要修改host变量,这组代码就可以用于远程服务的测试。我在远程部署了新服务后,也要通过这些测试确认部署状态。

接下来的章节,我们就要进入撮合服务了。

========================

项目代码地址: MarchLiu/market

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐