Conversation
|
|
||
| def toOptionSeqStringSeqEntry[K, F, V](mb: MultiBulk)(implicit deserializerK: ByteStringDeserializer[K], deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V]): Option[Seq[(K, Seq[StreamEntry[F, V]])]] = | ||
| mb.responses.map { r => | ||
| r.map(_.asInstanceOf[MultiBulk]).map(toStringSeqEntry[K,F,V]) |
There was a problem hiding this comment.
you could squeeze some perf by doing map only once
r.map{ case mb: MultiBulk => toStringSeqEntry[K,F,V](mb)}
There was a problem hiding this comment.
Good point. I'll include this in my next pull request.
| StreamEntry(id, fields) | ||
| } | ||
|
|
||
| def toSeqEntry[F, V](mb: MultiBulk)(implicit deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V]): Seq[StreamEntry[F, V]] = { |
There was a problem hiding this comment.
relatively similar to MultiBulkConverter.toSeqByteString
There was a problem hiding this comment.
While the method signature is similar, the underlying reply from Redis is significantly different.
The Stream commands use nested arrays in their replies to a much greater extent than commands for the other data structures. For example, XRANGE returns an array of two-element arrays, where the first element is the stream id and the second element is an array of field-value pairs. Borrowing JSON notation, this would be something like:
[
[ ID1, [ FIELD1, VALUE1, FIELD2, VALUE2, ... ] ],
[ ID2, [ FIELD1, VALUE1, FIELD2, VALUE2, ... ] ],
...
]where ID, FIELD and VALUE are all Bulk strings and [] indicates a MultiBulk array.
The XREAD reply is even more nested, with an array containing two element arrays where the first element is the stream key and the second element is a sequence of entries, similar to the XRANGE reply.
Because the replies are structured in such specific ways for Stream comamnds, I opted to put the decoding logic in a separate object (StreamEntryDecoder) rather than adding it to MultiBulkConverter.
I opted to use unsafe operations (casting to MultiBulk, accessing elements by index) for two reasons. First, I think it's best for decoding to break immediately and loudly if we get a reply that doesn't match our understanding/implementation of the Redis spec. Second, I don't see good fallback options, aside from silently dropping parts of the response entirely. Throwing an exception seems like the best response. It seems like there is precedent here in the use of head/tail in MultiBulkConverter.seqtoMapString and MultiBulkConverter.toOptionStringByteString, which will also throw exceptions if the array is too small.
How would you feel about introducing a new exception specifically for decode errors? This feels cleaner than emitting low-level exceptions like ClassCastException, NoSuchElementException, etc., and would allow callers to implement custom decode error handling if desired. I thought about reusing ReplyErrorException for this, but that seems best reserved for error messages from the Redis server itself. Perhaps ReplyDecodeException?
|
|
||
| private [redis] object StreamEntryDecoder { | ||
| def toEntry[F, V](mb: MultiBulk)(implicit deserializerF: ByteStringDeserializer[F], deserializerV: ByteStringDeserializer[V]): StreamEntry[F, V] = { | ||
| val r = mb.responses.get |
There was a problem hiding this comment.
You are using "unsafe" method here and there (.get, r(1) (seq.apply(index))
I don't know how safe it is to do it here.
Maybe you could try to compare with what we did in this file
There was a problem hiding this comment.
Please refer to the comment above for the reasoning behind usage of "unsafe" methods.
Hi there!
As noted in my previous pull request, I've made some changes to support redis 5.0 and the new Streams commands. This branch is based on my features/redis-4.0 branch, so please disregard the common changes. (I'll rebase this if the other pull request is accepted.)
This is still a work-in-progress, but I think there is enough here to get meaningful feedback, especially on the interface and reply decoding for stream commands. If the current approach looks good, I'll add support for consumer groups and blocking stream commands.
Thanks,
-David