Skip to content

Commit 28e4abc

Browse files
authored
Fix quic write (#342)
* cmake preset vcpkg Signed-off-by: turuslan <turuslan.devbox@gmail.com> * update vcpkg baseline Signed-off-by: turuslan <turuslan.devbox@gmail.com> * simplify read Signed-off-by: turuslan <turuslan.devbox@gmail.com> * fix quic version Signed-off-by: turuslan <turuslan.devbox@gmail.com> * batch defer process Signed-off-by: turuslan <turuslan.devbox@gmail.com> * fix write Signed-off-by: turuslan <turuslan.devbox@gmail.com> * include Signed-off-by: turuslan <turuslan.devbox@gmail.com> * pr comment Signed-off-by: turuslan <turuslan.devbox@gmail.com> --------- Signed-off-by: turuslan <turuslan.devbox@gmail.com>
1 parent 347eb8d commit 28e4abc

File tree

7 files changed

+137
-52
lines changed

7 files changed

+137
-52
lines changed

CMakePresets.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"version": 2,
3+
"configurePresets": [
4+
{
5+
"name": "vcpkg",
6+
"generator": "Ninja",
7+
"binaryDir": "${sourceDir}/build",
8+
"cacheVariables": {
9+
"CMAKE_TOOLCHAIN_FILE": "$env{VCPKG_ROOT}/scripts/buildsystems/vcpkg.cmake",
10+
"CMAKE_BUILD_TYPE": "Debug"
11+
}
12+
}
13+
]
14+
}

include/libp2p/transport/quic/engine.hpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <lsquic.h>
1010
#include <boost/asio/ip/udp.hpp>
1111
#include <boost/asio/steady_timer.hpp>
12+
#include <deque>
1213
#include <libp2p/multi/multiaddress.hpp>
1314
#include <libp2p/peer/peer_id.hpp>
1415
#include <memory>
@@ -75,14 +76,9 @@ namespace libp2p::transport::lsquic {
7576
Engine *engine;
7677
lsquic_stream_t *ls_stream;
7778
std::weak_ptr<QuicStream> stream{};
78-
/**
79-
* Stream read operation arguments.
80-
*/
81-
struct Reading {
82-
BytesOut out;
83-
std::function<void(outcome::result<size_t>)> cb;
84-
};
85-
std::optional<Reading> reading{};
79+
std::optional<std::function<void()>> reading{};
80+
std::optional<std::function<void()>> writing{};
81+
bool want_flush = false;
8682
};
8783

8884
using OnAccept = std::function<void(std::shared_ptr<QuicConnection>)>;
@@ -118,9 +114,11 @@ namespace libp2p::transport::lsquic {
118114
void onAccept(OnAccept cb) {
119115
on_accept_ = std::move(cb);
120116
}
121-
void process();
117+
void wantProcess();
118+
void wantFlush(StreamCtx *stream_ctx);
122119

123120
private:
121+
void process();
124122
void readLoop();
125123

126124
std::shared_ptr<boost::asio::io_context> io_context_;
@@ -134,6 +132,8 @@ namespace libp2p::transport::lsquic {
134132
lsquic_engine_t *engine_ = nullptr;
135133
OnAccept on_accept_;
136134
bool started_ = false;
135+
std::deque<std::weak_ptr<connection::QuicStream>> want_flush_;
136+
bool want_process_ = false;
137137
std::optional<Connecting> connecting_;
138138
struct Reading {
139139
static constexpr size_t kMaxUdpPacketSize = 64 << 10;

include/libp2p/transport/quic/stream.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@ namespace libp2p::transport {
1313
} // namespace libp2p::transport
1414

1515
namespace libp2p::transport::lsquic {
16+
class Engine;
1617
struct StreamCtx;
1718
} // namespace libp2p::transport::lsquic
1819

1920
namespace libp2p::connection {
2021
class QuicStream : public Stream,
2122
public std::enable_shared_from_this<QuicStream> {
23+
friend class libp2p::transport::lsquic::Engine;
24+
2225
public:
2326
QuicStream(std::shared_ptr<transport::QuicConnection> conn,
2427
transport::lsquic::StreamCtx *stream_ctx,

src/transport/quic/engine.cpp

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ namespace libp2p::transport::lsquic {
4545

4646
lsquic_engine_settings settings{};
4747
lsquic_engine_init_settings(&settings, flags);
48+
settings.es_versions = 1 << LSQVER_I001;
4849
settings.es_init_max_stream_data_bidi_remote =
4950
mux_config.maximum_window_size;
5051
settings.es_init_max_stream_data_bidi_local =
@@ -58,10 +59,10 @@ namespace libp2p::transport::lsquic {
5859

5960
static lsquic_stream_if stream_if{};
6061
stream_if.on_new_conn = +[](void *void_self, lsquic_conn_t *conn) {
61-
auto self = static_cast<Engine *>(void_self);
62+
auto *self = static_cast<Engine *>(void_self);
6263
auto op = qtils::optionTake(self->connecting_);
6364
// NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
64-
auto conn_ctx = new ConnCtx{self, conn, std::move(op)};
65+
auto *conn_ctx = new ConnCtx{self, conn, std::move(op)};
6566
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
6667
auto _conn_ctx = reinterpret_cast<lsquic_conn_ctx_t *>(conn_ctx);
6768
lsquic_conn_set_ctx(conn, _conn_ctx);
@@ -72,7 +73,7 @@ namespace libp2p::transport::lsquic {
7273
};
7374
stream_if.on_conn_closed = +[](lsquic_conn_t *conn) {
7475
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
75-
auto conn_ctx = reinterpret_cast<ConnCtx *>(lsquic_conn_get_ctx(conn));
76+
auto *conn_ctx = reinterpret_cast<ConnCtx *>(lsquic_conn_get_ctx(conn));
7677
if (auto op = qtils::optionTake(conn_ctx->connecting)) {
7778
op->cb(QuicError::CONN_CLOSED);
7879
}
@@ -85,7 +86,7 @@ namespace libp2p::transport::lsquic {
8586
};
8687
stream_if.on_hsk_done = +[](lsquic_conn_t *conn, lsquic_hsk_status status) {
8788
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
88-
auto conn_ctx = reinterpret_cast<ConnCtx *>(lsquic_conn_get_ctx(conn));
89+
auto *conn_ctx = reinterpret_cast<ConnCtx *>(lsquic_conn_get_ctx(conn));
8990
auto self = conn_ctx->engine;
9091
auto ok = status == LSQ_HSK_OK or status == LSQ_HSK_RESUMED_OK;
9192
auto op = qtils::optionTake(conn_ctx->connecting);
@@ -122,12 +123,12 @@ namespace libp2p::transport::lsquic {
122123
}
123124
};
124125
stream_if.on_new_stream = +[](void *void_self, lsquic_stream_t *stream) {
125-
auto self = static_cast<Engine *>(void_self);
126+
auto *self = static_cast<Engine *>(void_self);
126127
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
127-
auto conn_ctx = reinterpret_cast<ConnCtx *>(
128+
auto *conn_ctx = reinterpret_cast<ConnCtx *>(
128129
lsquic_conn_get_ctx(lsquic_stream_conn(stream)));
129130
// NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
130-
auto stream_ctx = new StreamCtx{self, stream};
131+
auto *stream_ctx = new StreamCtx{self, stream};
131132
if (auto conn = conn_ctx->conn.lock()) {
132133
auto stream = std::make_shared<QuicStream>(
133134
conn, stream_ctx, conn_ctx->new_stream.has_value());
@@ -146,29 +147,36 @@ namespace libp2p::transport::lsquic {
146147
stream_if.on_close =
147148
+[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) {
148149
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
149-
auto stream_ctx = reinterpret_cast<StreamCtx *>(_stream_ctx);
150-
if (auto op = qtils::optionTake(stream_ctx->reading)) {
151-
op->cb(QuicError::STREAM_CLOSED);
152-
}
150+
auto *stream_ctx = reinterpret_cast<StreamCtx *>(_stream_ctx);
153151
if (auto stream = stream_ctx->stream.lock()) {
154152
stream->onClose();
155153
}
154+
if (auto reading = qtils::optionTake(stream_ctx->reading)) {
155+
reading.value()();
156+
}
157+
if (auto writing = qtils::optionTake(stream_ctx->writing)) {
158+
writing.value()();
159+
}
156160
// NOLINTNEXTLINE(cppcoreguidelines-owning-memory)
157161
delete stream_ctx;
158162
};
159163
stream_if.on_read =
160164
+[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) {
161165
lsquic_stream_wantread(stream, 0);
162166
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
163-
auto stream_ctx = reinterpret_cast<StreamCtx *>(_stream_ctx);
164-
auto op = qtils::optionTake(stream_ctx->reading).value();
165-
auto n = lsquic_stream_read(stream, op.out.data(), op.out.size());
166-
outcome::result<size_t> r = QuicError::STREAM_CLOSED;
167-
if (n > 0) {
168-
r = n;
167+
auto *stream_ctx = reinterpret_cast<StreamCtx *>(_stream_ctx);
168+
if (auto reading = qtils::optionTake(stream_ctx->reading)) {
169+
reading.value()();
170+
}
171+
};
172+
stream_if.on_write =
173+
+[](lsquic_stream_t *stream, lsquic_stream_ctx_t *_stream_ctx) {
174+
lsquic_stream_wantwrite(stream, 0);
175+
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
176+
auto *stream_ctx = reinterpret_cast<StreamCtx *>(_stream_ctx);
177+
if (auto writing = qtils::optionTake(stream_ctx->writing)) {
178+
writing.value()();
169179
}
170-
post(*stream_ctx->engine->io_context_,
171-
[cb{std::move(op.cb)}, r] { cb(r); });
172180
};
173181

174182
lsquic_engine_api api{};
@@ -179,7 +187,7 @@ namespace libp2p::transport::lsquic {
179187
api.ea_packets_out = +[](void *void_self,
180188
const lsquic_out_spec *out_spec,
181189
unsigned n_packets_out) {
182-
auto self = static_cast<Engine *>(void_self);
190+
auto *self = static_cast<Engine *>(void_self);
183191
// https://github.com/cbodley/nexus/blob/d1d8486f713fd089917331239d755932c7c8ed8e/src/socket.cc#L218
184192
int r = 0;
185193
for (auto &spec : std::span{out_spec, n_packets_out}) {
@@ -216,7 +224,7 @@ namespace libp2p::transport::lsquic {
216224
};
217225
api.ea_packets_out_ctx = this;
218226
api.ea_get_ssl_ctx = +[](void *void_self, const sockaddr *) {
219-
auto self = static_cast<Engine *>(void_self);
227+
auto *self = static_cast<Engine *>(void_self);
220228
return self->ssl_context_->native_handle();
221229
};
222230

@@ -261,7 +269,7 @@ namespace libp2p::transport::lsquic {
261269
if (auto op = qtils::optionTake(connecting_)) {
262270
op->cb(QuicError::CANT_CREATE_CONNECTION);
263271
}
264-
process();
272+
wantProcess();
265273
}
266274

267275
outcome::result<std::shared_ptr<QuicStream>> Engine::newStream(
@@ -281,7 +289,47 @@ namespace libp2p::transport::lsquic {
281289
return stream;
282290
}
283291

292+
void Engine::wantProcess() {
293+
if (want_process_) {
294+
return;
295+
}
296+
want_process_ = true;
297+
boost::asio::post(*io_context_, [weak_self{weak_from_this()}] {
298+
if (auto self = weak_self.lock()) {
299+
self->process();
300+
}
301+
});
302+
}
303+
304+
void Engine::wantFlush(StreamCtx *stream_ctx) {
305+
if (stream_ctx->want_flush) {
306+
return;
307+
}
308+
stream_ctx->want_flush = true;
309+
if (stream_ctx->stream.expired()) {
310+
return;
311+
}
312+
want_flush_.emplace_back(stream_ctx->stream);
313+
wantProcess();
314+
}
315+
284316
void Engine::process() {
317+
want_process_ = false;
318+
auto want_flush = std::exchange(want_flush_, {});
319+
for (auto &weak_stream : want_flush) {
320+
auto stream = weak_stream.lock();
321+
if (not stream) {
322+
continue;
323+
}
324+
if (stream->stream_ctx_ == nullptr) {
325+
continue;
326+
}
327+
if (stream->stream_ctx_->ls_stream == nullptr) {
328+
continue;
329+
}
330+
stream->stream_ctx_->want_flush = false;
331+
lsquic_stream_flush(stream->stream_ctx_->ls_stream);
332+
}
285333
lsquic_engine_process_conns(engine_);
286334
int us = 0;
287335
if (not lsquic_engine_earliest_adv_tick(engine_, &us)) {

src/transport/quic/stream.cpp

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,15 @@ namespace libp2p::connection {
3434
}
3535
auto n = lsquic_stream_read(stream_ctx_->ls_stream, out.data(), out.size());
3636
if (n == -1 and errno == EWOULDBLOCK) {
37-
stream_ctx_->reading.emplace(StreamCtx::Reading{out, std::move(cb)});
37+
stream_ctx_->reading.emplace(
38+
[weak_self{weak_from_this()}, out, cb{std::move(cb)}]() mutable {
39+
auto self = weak_self.lock();
40+
if (not self) {
41+
cb(QuicError::STREAM_CLOSED);
42+
return;
43+
}
44+
self->readSome(out, std::move(cb));
45+
});
3846
lsquic_stream_wantread(stream_ctx_->ls_stream, 1);
3947
return;
4048
}
@@ -54,11 +62,31 @@ namespace libp2p::connection {
5462
if (not stream_ctx_) {
5563
return cb(r);
5664
}
65+
if (stream_ctx_->writing) {
66+
throw std::logic_error{"QuicStream::writeSome already in progress"};
67+
}
68+
// Missing from `lsquic_stream_write` documentation comment.
69+
// Return value 0 means buffer is full.
70+
// Call `lsquic_stream_wantwrite` and wait for `stream_if.on_write`
71+
// callback, before calling `lsquic_stream_write` again.
5772
auto n = lsquic_stream_write(stream_ctx_->ls_stream, in.data(), in.size());
58-
if (n > 0 and lsquic_stream_flush(stream_ctx_->ls_stream) == 0) {
73+
if (n == 0) {
74+
stream_ctx_->writing.emplace(
75+
[weak_self{weak_from_this()}, in, cb{std::move(cb)}]() mutable {
76+
auto self = weak_self.lock();
77+
if (not self) {
78+
cb(QuicError::STREAM_CLOSED);
79+
return;
80+
}
81+
self->writeSome(in, std::move(cb));
82+
});
83+
lsquic_stream_wantwrite(stream_ctx_->ls_stream, 1);
84+
return;
85+
}
86+
if (n > 0) {
5987
r = n;
88+
stream_ctx_->engine->wantFlush(stream_ctx_);
6089
}
61-
stream_ctx_->engine->process();
6290
deferReadCallback(r, std::move(cb));
6391
}
6492

test/libp2p/transport/quic_test.cpp

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
#include <libp2p/basic/read.hpp>
1010
#include <libp2p/basic/write.hpp>
1111
#include <libp2p/injector/host_injector.hpp>
12-
#include <qtils/bytestr.hpp>
1312

1413
#include "testutil/prepare_loggers.hpp"
1514

@@ -19,8 +18,6 @@ using libp2p::Multiaddress;
1918
using libp2p::StreamAndProtocol;
2019
using libp2p::StreamAndProtocolOrError;
2120
using libp2p::connection::Stream;
22-
using qtils::byte2str;
23-
using qtils::str2byte;
2421

2522
auto makeInjector(std::shared_ptr<io_context> io) {
2623
return libp2p::injector::makeHostInjector<
@@ -54,7 +51,9 @@ struct Peer {
5451
TEST(Quic, Test) {
5552
testutil::prepareLoggers();
5653
std::string protocol = "/test";
57-
std::string_view req{"request"}, res{"response"};
54+
const size_t size = 1 << 20;
55+
libp2p::Bytes req(size, 'a'), res(size, 'b');
56+
5857
auto io = std::make_shared<io_context>();
5958
auto run = [&] {
6059
io->restart();
@@ -94,16 +93,16 @@ TEST(Quic, Test) {
9493
}
9594

9695
wait_count = 2;
97-
qtils::Bytes req_out(req.size());
98-
libp2p::write(client.stream, str2byte(req), RW_CB);
96+
libp2p::Bytes req_out(req.size());
97+
libp2p::write(client.stream, req, RW_CB);
9998
libp2p::read(server.stream, req_out, RW_CB);
10099
run();
101-
EXPECT_EQ(byte2str(req_out), req);
100+
EXPECT_EQ(req_out, req);
102101

103102
wait_count = 2;
104-
qtils::Bytes res_out(res.size());
103+
libp2p::Bytes res_out(res.size());
105104
libp2p::read(client.stream, res_out, RW_CB);
106-
libp2p::write(server.stream, str2byte(res), RW_CB);
105+
libp2p::write(server.stream, res, RW_CB);
107106
run();
108-
EXPECT_EQ(byte2str(res_out), res);
107+
EXPECT_EQ(res_out, res);
109108
}

vcpkg-configuration.json

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,9 @@
11
{
22
"default-registry": {
33
"kind": "git",
4-
"baseline": "fe1cde61e971d53c9687cf9a46308f8f55da19fa",
4+
"baseline": "897ba2ab4c4c776b985ab1f599548fcf3ae598ba",
55
"repository": "https://github.com/microsoft/vcpkg"
66
},
7-
"registries": [
8-
{
9-
"kind": "artifact",
10-
"location": "https://github.com/microsoft/vcpkg-ce-catalog/archive/refs/heads/main.zip",
11-
"name": "microsoft"
12-
}
13-
],
147
"overlay-ports": [
158
"vcpkg-overlay"
169
]

0 commit comments

Comments
 (0)