If you use KunQuant in online services, when the data for each tick are received one by one, you may need the "streaming" mode.
It is almost the same as the steps in Customize.md and Readme.md. The main difference is that you need to specify output_layout="STREAM" in generate.py of your Factor library generator. project/Alpha101Stream is an example of Alpha101 in streaming mode. You can check the difference of projects/Alpha101/generate.py and project/Alpha101Stream/generate.py. Except the difference in the names, the only difference is at the line
src = compileit(f, "alpha_101_stream", partition_factor=8, output_layout="STREAM", options={"opt_reduce": False, "fast_log": True})We specified a different partition_factor for performance and we turn off the opt_reduce optimization. We also tell KunQuant that it should compile in streaming mode by output_layout="STREAM".
We can build it via the commands
cmake --build . --target Alpha101StreamFor more info of building customized factors, see Customize.md
Load the compiled factor library as usual:
import KunRunner as kr
lib = kr.Library.load("./projects/libAlpha101Stream.so")
modu = lib.getModule("alpha_101_stream")Create the executor (mult-thread executor is also supported). Assume we have 16 stocks (the number of stocks must be a multiple of 8). And we create a streaming context:
num_stock = 16
executor = kr.createSingleThreadExecutor()
stream = kr.StreamContext(executor, modu, num_stock)Query the buffer handles. You need to cache the handles
buffer_name_to_id = dict()
for name in ["high","low","close","open","volume","amount"]:
buffer_name_to_id[name] = stream.queryBufferHandle(name)
for name in ["alpha001", "alpha002"]: #and other factors
buffer_name_to_id[name] = stream.queryBufferHandle(name)We define a function to process each tick of new data. The data should contain "high","low","close","open","volume","amount" for each of the stocks. Each parameter should be a numpy array of length num_stock for the data of each stocks.
def on_tick(high, low, close, open, volume, amount):
# high should be an ndarray of shape (16,)
stream.pushData(buffer_name_to_id["high"], high)
stream.pushData(buffer_name_to_id["low"], low)
stream.pushData(buffer_name_to_id["close"], close)
stream.pushData(buffer_name_to_id["open"], open)
stream.pushData(buffer_name_to_id["volume"], volume)
stream.pushData(buffer_name_to_id["amount"], amount)
stream.run()
alpha001: np.ndarray = stream.getCurrentBuffer(buffer_name_to_id["alpha001"])[:]
alpha002: np.ndarray = stream.getCurrentBuffer(buffer_name_to_id["alpha002"])[:]
# do with alpha001 and alpha002Basically, you need to call pushData for each input. Then call run() to let the stream step forward. Finally, use getCurrentBuffer to get the result. A very important note is that the ndarray returned by getCurrentBuffer is valid only
- before next call of
pushDataorrunon the same stream - before the stream context object is deleted
That's why in the above code, we immediately copy the ndarray returned by getCurrentBuffer with [:].
You can get the stream states from the context object via serializeStates method after run() is called.
stream = kr.StreamContext(executor, modu, num_stock)
stream.pushData(...)
...
stream.run()
states: bytes = stream.serializeStates()The method will return a bytes object representing a copy of the states of the stream, if no arguments are given to serializeStates(). Or you can pass a string as a file path to write to serializeStates(...), to dump the states to a file:
stream.serializeStates("path/to/outout/file") # None is returnedTo resume from a previous state of the stream, call kr.StreamContext with an additional argument to represent the bytes or file path of the dumped states:
stream = kr.StreamContext(executor, modu, num_stock, state_bytes_or_file_path)Important note The serialized states are sensitive to the
- KunQuant version
- OS/CPU architecture
- C++ compiler used to compile KunQuant
- number of stocks
- different factor expressions
If a stream is different from another in the aspects of above, they are incompatible. Do not feed a stream with stream states generated by another incompatible stream.
The logic is similar to the Python API above. For details, see tests/capi/test_c.cpp and cpp/Kun/CApi.h.