提问者:小点点

Clojure中巨型JSON的并行处理


我们的数据来自DB,我们需要在发送之前应用一些业务逻辑,所以我们将数据转换为Clojure map格式进行处理。数据是多级嵌套的map,我们必须处理所有级别map中的每个键和值,对于处理我们使用Clojure. walk.postwalk。因为数据庞大,所以需要更多的时间。

在数据中,第一级包含大约5个键,每个键的值可能是另一个映射或向量。同样,它可能会进入10到15个级别。我们在第一级尝试了pmap,但速度很慢。如果数据是简单的向量,我们可以使用分区,但由于嵌套的复杂结构,很难使用分区。

有没有无论如何让这个过程更快,基本上我们的要求是对每个键应用一个函数,对每个值应用一个单独的函数。


共3个答案

匿名用户

我很幸运地使用了未来的两遍方法。基本上,你遍历整个树一次,将每个转换包装在未来中。然后你第二次遍历树,deref划分每个未来。我认为两遍会太贵,但我用一个相当大的嵌套树尝试了一下,它比仅仅使用postwalk要快得多。

我使用的测试用例是找到第n个素数来模拟一个昂贵的操作。树是关键字/数字对的嵌套映射。所有找到的数字都转换为找到的第250个素数。

我使用的测试数据是这样的混乱:

(def giant-tree
  {:a 28,
   :e {:d {:a 37,
           :e 92,
           :d {:b {:c 91,
                   :d {:e 12,
                       :a 22,
                       :d {:e {:a {:a 53}, :d 98},
                           :d {:b 23,
                               :a {:a {:a 97},
                                   :c {:c 47,
                                       :d {:c {:d {}},
                                           :e {:e 57,
                                               :d {:a 57,
                                                   :d 42,
                                                   :e {:d {:e 64,
                                                           :a {:d {:b 14,
                                                                   :d {:c {},
                                                                       :b {},
                                                                       :a {:b {:b 86,
                                                                               :a {:d 86, :c 52},
                                                                               :d {:d {:a {},
                                                                                       :c {:a {}, :c 0, :b {:c 29}},
                                                                                       :d 88},
                                                                                   :c {:c 88},
                                                                                   :a {:c 89, :a {:a 42, :c 62}},
                                                                                   :b 30},
                                                                               :e 60},
                                                                           :c {:e 18,
                                                                               :d {:e {}, :d 70, :b 90},
                                                                               :b {:a {:a 1}}}}},
                                                                   :e 47,
                                                                   :c 19},
                                                               :c {:a 56,
                                                                   :c {:a {:a 73,
                                                                           :e 39,
                                                                           :d 21,
                                                                           :b {:e {:d {}, :b 82, :c 12, :a 80},
                                                                               :a {:a 22,
                                                                                   :e {:b {:b {:b 20, :a 50}}, :c 23},
                                                                                   :b 55,
                                                                                   :d 80},
                                                                               :c 13}},
                                                                       :e 15},
                                                                   :b 68,
                                                                   :d 58},
                                                               :a 49},
                                                           :b 5},
                                                       :c 38}},
                                               :a {:a {:d 35, :a 99}},
                                               :c {:d {}}},
                                           :b {},
                                           :d 95}}},
                               :d {:b {:c 99}, :c 83, :e 61, :d 55},
                               :c {:b {:c 97,
                                       :a {:a {:b 86, :a {}, :e {:a 52, :c 20, :e 20}, :d 49}, :c 62},
                                       :d {:c 97,
                                           :d {:d {:d {:a 46, :c 90, :d {}, :e 88}, :e {:a 14, :c 48}},
                                               :c {},
                                               :a 87,
                                               :e 66}},
                                       :e 9}}}},
                       :b 64},
                   :a 4,
                   :e 19},
               :a {},
               :e 9}}}})

我正在使用Criterium进行基准测试。

这是我正在测试的代码:

(ns fast-tree-transform.fast-tree-transform
  (:require [fast-tree-transform.test-data :as td]

            [clojure.walk :as w]

            [criterium.core :as c]))

(def default-price 250)

(defn prime? [n]
  (not
    (or (zero? n)
        (some #(zero? (rem n %)) (range 2 n)))))

(defn nth-prime [n]
  (nth (filter prime? (range))
       n))

(defn expensive-transform [e]
  (if (number? e)
    (nth-prime default-price)
    e))

; ----- Simple usage without any parallel aspect
(defn transform-data [nested-map]
  (w/postwalk expensive-transform nested-map))

; ----- Puts each call in a future so it's run in a thread pool
(defn future-transform [e]
  (if (number? e)
    (future (expensive-transform e))
    e))

; ----- The second pass to resolve each future
(defn resolve-transform [e]
  (if (future? e)
    @e
    e))

; ----- Tie them both together
(defn future-transform-data [nested-map]
  (->> nested-map
      (w/postwalk future-transform)
      (w/postwalk resolve-transform)))

感兴趣的两个主要函数是transport-datafoure-transport-data

以下是结果:

(c/bench
  (transform-data td/giant-tree))

Evaluation count : 60 in 60 samples of 1 calls.
             Execution time mean : 1.085124 sec
    Execution time std-deviation : 38.049523 ms
   Execution time lower quantile : 1.062980 sec ( 2.5%)
   Execution time upper quantile : 1.193548 sec (97.5%)
                   Overhead used : 3.088370 ns

Found 4 outliers in 60 samples (6.6667 %)
    low-severe   4 (6.6667 %)
 Variance from outliers : 22.1802 % Variance is moderately inflated by outliers

(c/bench
  (future-transform-data td/giant-tree))

Evaluation count : 120 in 60 samples of 2 calls.
             Execution time mean : 526.771107 ms
    Execution time std-deviation : 14.202895 ms
   Execution time lower quantile : 513.002517 ms ( 2.5%)
   Execution time upper quantile : 568.856393 ms (97.5%)
                   Overhead used : 3.088370 ns

Found 5 outliers in 60 samples (8.3333 %)
    low-severe   1 (1.6667 %)
    low-mild     4 (6.6667 %)
 Variance from outliers : 14.1940 % Variance is moderately inflated by outliers

你可以看到它的速度是原来的两倍。

匿名用户

根据数据的性质(例如,第一级键的数量以及这些键下嵌套的平衡程度)和您的硬件(CPU核心的数量),您尝试过的方法(第一级的pmap)可能是您能做的最好的方法。

在嵌套映射结构上并行化的一种相对简单的方法本质上只是“扁平化”映射,以便每个键实际上是表示值路径(原始嵌套映射中的叶子)的键向量。例如:

(defn extract-keys
  "Returns a seq of vectors that are the paths of keys to the leaves of map m."
  [m]
  (mapcat (fn [[k v]]
            (if (map? v)
              (map #(cons k %)
                   (extract-keys v))
              [[k]]))
          m))

(def data {:a {:b {:c {:d [1 2] :e [3 4 5 6]}
                   :f [7]}
               :g [8 9 10]}
           :h [11 12 13 14 15 16]})

;; Prints ((:a :b :c :d) (:a :b :c :e) (:a :b :f) (:a :g) [:h])
(println (extract-keys data))

然后,您可以在这个扁平化的地图上使用pmap

(defn- map-leaves
  [f m]
  (->> (extract-keys m)
       (pmap #(vector % (f (get-in data %))))
       (reduce (fn [m [k v]]
                 (assoc-in m k v))
               {})))

;; Prints {:a {:b {:c {:d 3, :e 18}, :f 7}, :g 27}, :h 81}
(println (map-leaves #(apply + %) data))

这可以直接修改为改变键(以及值),或者在pmap之前对[k v]对进行分区以减少并行化开销。当然,扁平化/非扁平化也会有相当多的开销,因此这是否会比您已经尝试过的更快取决于数据的性质、硬件和转换。

匿名用户

您可以使用https://github.com/clojure/data.jsonjson/read-str将执行该技巧。您可以从db. no?:)以字符串形式发送数据,并且您可以再次使用pr-str