Implementing concurrent execution of a function across several kdb+ processes simultaneously is an interesting topic worth investigating. While this requirement might not be a common use case, it becomes essential in scenarios such as evaluating storage performance scaling under simultaneous disk read operations within a storage performance tool - see nano storage performance tool that triggered this small research.
The orchestration of this function execution is controlled by a central kdb+ process acting as a controller. The controller facilitates communication with worker processes responsible for executing the function. I considered two methods, namely qIPC (q Inter-Process Communication) and file operations to transmit execution messages from the controller to worker processes.
The worker processes are configured to listen on distinct addresses, such as host1:port1
, host2:port2
, and host3:port3
. The goal is to execute a function, denoted as f
, across these worker processes. Notably, function f
does not require any parameters. The list of worker addresses is defined as A
:
addr: ("host1:port1"; "host2:port2"; "host3:port3")
A: hsym `$addr
H: hopen each A
Synchronization solutions encompass multiple options, including TCP/Unix Domain Socket messages, file-based mechanisms, and CPU counters. Let’s explore these approaches in detail.
peach-based solutions
One technique to achieve parallel execution is by leveraging the parallel each (peach) construct.
sync message
Sending synchronous messages within peach
threads is a risky proposition due to potential message interleaving, as specified in the KX documentation.
A handle must not be used concurrently between threads as there is no locking around a socket descriptor, and the bytes being read/written from/to the socket will be garbage (due to message interleaving) and most likely result in a crash.
If we start the controller with an equivalent number of secondary threads as worker processes then each handler can have its own thread.
@[; (`f; ::)] peach H //Pass list of existing (H)andles
Thus method is denoted by peach sync
in the Results section.
This approach is no longer supported in later kdb+ versions including kdb+ cloud edition (aka. KX Insights Core). See kdb+ 4.1 release notes
2021.03.30
NUC
using handles within peach is temporarily not supported, to be reviewed in the near future e.g.
q)H:hopen each 4#4000;{x""}peach H
3 4 5 6i
one-shot ipc requests can be used within peach instead.
one-shot request (peach one-shot
)
In cases where the controller cannot be started with a sufficient number of threads, one-shot requests can be employed. This technique involves opening a connection, sending a synchronous message, and then closing the connection instantly. Unlike the previous approach, this method is safe to use within peach
:
@[; (`f; ::)] peach A //Pass list of (A)ddresses
The controller kdb+ process must be started with a positive integer -s command line parameter.
This approach introduces the overhead of establishing and closing connections for each task, impacting overall efficiency.
peach handlers (peach handles
)
peach handlers are open connection handlers to remote kdb+ processes that kdb+ sends the function to execute during a peach
statement. kdb+ assigns the tasks sequentially to the list of remote handles as defined in .z.pd via a synchronous message call. If the length of the input list equals the number of remote processes then kdb+ guarantees that each kdb+ process gets one request. The simplest way to guarantee this is to use .z.pd
as the input list for peach
, we dont use the values anyway.
.z.pd: `u#H
f peach .z.pd
The controller kdb+ process must be started with a negative integer -s
command line parameter.
each-based solution
We can iterate over the list of hosts or connection handlers by iterator each
. each
starts the tasks sequentially which means it waits till the previous task is finished. We need to make sure that starting the task, i.e. sending the start signal does not block the sender. We can achieve this by sending asynchronous messages.
We implemented four async approaches:
each async
: vanilla solution, i.e. sending a message vianeg h
.each async flush
: The primary purpose of async messages is to avoid the sender being blocked and it does not guarantee that message is sent immediately. Actual sending is up to the operating system. This adds variance to the arrival times of the messages. We can reduce sending latency by flushing the outgoing queue (neg[h][]
).async broadcast flush
: We send the same message to all workers. We can save serializing the same message by using an asynchronous broadcast (-25!
). The benefit of async broadcast is more obvious with large messages so we don’t expect significant improvement in our case.each async time
: Our original goal is not to reduce the latency of sending but to start the function at the same time. We can achieve this by a timer in the worker and using async messages only to trigger the timer. kdb+’s timer solution is millisecond-based. If the workers are idle between task executions then we can fully spin the CPU and just increase a counter till the target time is reached (a:0; while[.z.p<target;a+:1]
)
Another tool of parallel execution is deferred responses which are generally used in implementations of kdb+ gateways. Deferred responses are used with synchronous messages to suspend the execution of the actual request to give CPU to other requests. Deferred responses do not help in our case because it frees up the workers as opposed to the controller, which remains blocked.
file-watching based solution (async InotifyWait
)
An alternative synchronization method involves file monitoring. Processes can execute the function upon detecting specific file operations. If the worker processes are spread across different servers then you need a shared file system that supports monitoring.
Linux’s inotify function (available in the package inotify-tools
) serves as the foundation for file system monitoring. A kdb+ function utilizing the C API can wrap this function. Alternatively, the inotifywait
system command can be employed, albeit with less precision.
Results
To assess the efficacy of these methods, a series of tests were conducted, each executed multiple times (20) to calculate the average, maximum, and standard deviation of maximal delays in microseconds. The number of kdb+ workers equaled the system’s core count and were pinned to separate cores via taskset.
Azure VM
Specification:
- Standard D4ds v4 (4 vCPUs, 16 GiB memory).
- Command
Inotifywait
used a local ephemeral disk for synchronization. - Worker nr: 4
Run 1:
method | medium | average | maximum | deviation |
---|---|---|---|---|
each async timer | cpu | 1 | 2 | 1 |
peach handles | uds | 6 | 70 | 15 |
peach sync | uds | 13 | 23 | 6 |
peach sync | tcp | 17 | 57 | 20 |
peach handles | tcp | 34 | 191 | 37 |
async InotifyWait | file | 35 | 165 | 40 |
each async | tcp | 39 | 99 | 16 |
peach one-shot | uds | 49 | 630 | 136 |
peach one-shot | tcp | 70 | 615 | 127 |
each async flush | tcp | 73 | 497 | 99 |
async broadcast flush | tcp | 91 | 827 | 170 |
each deferred | tcp | 757477 | 757669 | 179 |
Run 2:
method | medium | average | maximum | deviation |
---|---|---|---|---|
each async timer | cpu | 1 | 2 | 0 |
peach handles | uds | 8 | 21 | 6 |
peach sync | uds | 13 | 27 | 7 |
peach sync | tcp | 16 | 50 | 15 |
peach one-shot | uds | 41 | 390 | 91 |
each async | tcp | 44 | 118 | 22 |
peach one-shot | tcp | 66 | 232 | 64 |
async broadcast flush | tcp | 69 | 347 | 65 |
each async flush | tcp | 88 | 774 | 158 |
async InotifyWait | file | 397 | 7345 | 1594 |
peach handles | tcp | 588 | 11061 | 2403 |
each deferred | tcp | 757201 | 757307 | 82 |
HPE DL385
Specification:
- Processors: (2) AMD EPYC 7763 processors (2.45 GHz), 128 cores
- Memory: (32) 128 GiB quad rank DDR4-3200 DIMMs
Worker nr: 4
Run 1:
method | medium | average | maximum | deviation |
---|---|---|---|---|
each async timer | cpu | 0 | 0 | 0 |
peach handles | uds | 4 | 6 | 1 |
peach sync | tcp | 20 | 49 | 10 |
peach one-shot | tcp | 23 | 47 | 8 |
each async | tcp | 27 | 33 | 4 |
peach one-shot | uds | 33 | 339 | 71 |
each async flush | tcp | 34 | 40 | 3 |
peach sync | uds | 34 | 277 | 56 |
async broadcast flush | tcp | 36 | 45 | 4 |
peach handles | tcp | 41 | 267 | 52 |
each deferred | tcp | 755951 | 756028 | 107 |
Run 2:
method | medium | average | maximum | deviation |
---|---|---|---|---|
each async timer | cpu | 0 | 0 | 0 |
peach handles | uds | 4 | 6 | 1 |
peach one-shot | tcp | 6 | 11 | 3 |
peach sync | tcp | 8 | 18 | 3 |
peach sync | uds | 18 | 30 | 7 |
peach one-shot | uds | 19 | 281 | 60 |
each async | tcp | 29 | 39 | 6 |
peach handles | tcp | 29 | 37 | 6 |
async broadcast flush | tcp | 33 | 40 | 3 |
each async flush | tcp | 45 | 226 | 42 |
each deferred | tcp | 756093 | 756153 | 72 |
Worker nr: 128
Run 1:
method | medium | average | maximum | deviation |
---|---|---|---|---|
each async timer | cpu | 1 | 2 | 0 |
peach sync | uds | 630 | 2154 | 629 |
peach handles | uds | 795 | 3015 | 830 |
peach sync | tcp | 942 | 8478 | 1837 |
peach one-shot | tcp | 1635 | 3260 | 1045 |
peach one-shot | uds | 2493 | 5078 | 835 |
peach handles | tcp | 2516 | 4999 | 751 |
each async flush | tcp | 3409 | 11316 | 2543 |
each async | tcp | 3483 | 13956 | 3394 |
async broadcast flush | tcp | 3791 | 13481 | 3461 |
each deferred | tcp | 32013787 | 32015348 | 1248 |
Run 2:
method | medium | average | maximum | deviation |
---|---|---|---|---|
each async timer | cpu | 1 | 2 | 1 |
peach sync | uds | 587 | 1591 | 517 |
peach handles | uds | 775 | 3963 | 942 |
peach sync | tcp | 1108 | 7144 | 1616 |
peach one-shot | tcp | 1714 | 4215 | 1308 |
peach one-shot | uds | 2193 | 4264 | 990 |
peach handles | tcp | 2505 | 4623 | 888 |
each async flush | tcp | 2986 | 11150 | 1983 |
async broadcast flush | tcp | 3196 | 12482 | 2213 |
each async | tcp | 3296 | 14126 | 2557 |
each deferred | tcp | 32014930 | 32016046 | 868 |
Conclusions
The timer-based solution offers unparalleled synchronization, yet it maintains CPUs in an active state until function execution starts. Selecting the optimal trigger time requires meticulous consideration, accounting for hardware specifications and network latency. For applications demanding utmost precision, this method stands as the optimal choice.
Following closely is the peach
approach employing synchronous messages, securing its position as a formidable contender—given that a dedicated thread can be assigned to each worker. However, it’s noteworthy that this approach is unsupported in kdb+ 4.1. In contrast, peach handlers exhibit commendable speed and enjoys support in all kdb+ versions.
For a trifecta of convenience, safety, and efficiency, the employment of one-shot requests emerges as a viable solution. This approach ensures expedient execution, all while circumventing potential pitfalls encountered in other methods.