Skip to content

Commit 994e1b2

Browse files
committed
track IO layer read buffers via MemTrackerLimiter using PODArray
1 parent 863419f commit 994e1b2

File tree

5 files changed

+25
-14
lines changed

5 files changed

+25
-14
lines changed

be/src/io/fs/buffered_reader.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,14 @@ void PrefetchBuffer::prefetch_buffer() {
449449
_prefetched.notify_all();
450450
}
451451

452+
// Lazy-allocate the backing buffer on first actual prefetch, avoiding the cost of
453+
// pre-allocating memory for readers that are initialized but never read (e.g. when
454+
// many file readers are created concurrently for a TVF scan over many small S3 files).
455+
if (_buf.empty()) {
456+
_buf.resize(_size);
457+
}
458+
459+
452460
int read_range_index = search_read_range(_offset);
453461
size_t buf_size;
454462
if (read_range_index == -1) {
@@ -463,7 +471,7 @@ void PrefetchBuffer::prefetch_buffer() {
463471

464472
{
465473
SCOPED_RAW_TIMER(&_statis.read_time);
466-
s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, _io_ctx);
474+
s = _reader->read_at(_offset, Slice {_buf.data(), buf_size}, &_len, _io_ctx);
467475
}
468476
if (UNLIKELY(s.ok() && buf_size != _len)) {
469477
// This indicates that the data size returned by S3 object storage is smaller than what we requested,
@@ -602,7 +610,7 @@ Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
602610
size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off});
603611
{
604612
SCOPED_RAW_TIMER(&_statis.copy_time);
605-
memcpy((void*)out, _buf.get() + (off - _offset), read_len);
613+
memcpy((void*)out, _buf.data() + (off - _offset), read_len);
606614
}
607615
*bytes_read = read_len;
608616
_statis.request_io += 1;

be/src/io/fs/buffered_reader.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
#include "common/status.h"
3131
#include "core/custom_allocator.h"
32+
#include "core/pod_array.h"
3233
#include "core/typeid_cast.h"
3334
#include "io/cache/cached_remote_file_reader.h"
3435
#include "io/file_factory.h"
@@ -436,7 +437,6 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public Pro
436437
_reader(reader),
437438
_io_ctx_holder(std::move(io_ctx)),
438439
_io_ctx(_io_ctx_holder.get()),
439-
_buf(new char[buffer_size]),
440440
_sync_profile(std::move(sync_profile)) {}
441441

442442
PrefetchBuffer(PrefetchBuffer&& other)
@@ -465,7 +465,7 @@ struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public Pro
465465
io::FileReader* _reader = nullptr;
466466
std::shared_ptr<const IOContext> _io_ctx_holder;
467467
const IOContext* _io_ctx = nullptr;
468-
std::unique_ptr<char[]> _buf;
468+
PODArray<char> _buf;
469469
BufferStatus _buffer_status {BufferStatus::RESET};
470470
std::mutex _lock;
471471
std::condition_variable _prefetched;

be/src/io/fs/hdfs_file_system.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
#include "common/config.h"
3333
#include "common/status.h"
34+
#include "core/pod_array.h"
3435
#include "io/fs/err_utils.h"
3536
#include "io/fs/hdfs/hdfs_mgr.h"
3637
#include "io/fs/hdfs_file_reader.h"
@@ -296,18 +297,19 @@ Status HdfsFileSystem::download_impl(const Path& remote_file, const Path& local_
296297
// 4. read remote and write to local
297298
LOG(INFO) << "read remote file: " << remote_file << " to local: " << local_file;
298299
constexpr size_t buf_sz = 1024 * 1024;
299-
std::unique_ptr<char[]> read_buf(new char[buf_sz]);
300+
PODArray<char> read_buf;
301+
read_buf.resize(buf_sz);
300302
size_t cur_offset = 0;
301303
while (true) {
302304
size_t read_len = 0;
303-
Slice file_slice(read_buf.get(), buf_sz);
305+
Slice file_slice(read_buf.data(), buf_sz);
304306
RETURN_IF_ERROR(hdfs_reader->read_at(cur_offset, file_slice, &read_len));
305307
cur_offset += read_len;
306308
if (read_len == 0) {
307309
break;
308310
}
309311

310-
RETURN_IF_ERROR(local_writer->append({read_buf.get(), read_len}));
312+
RETURN_IF_ERROR(local_writer->append({read_buf.data(), read_len}));
311313
}
312314
return local_writer->close();
313315
}

be/src/io/fs/http_file_reader.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url, in
8484
}
8585
}
8686

87-
_read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
87+
_read_buffer.resize(READ_BUFFER_SIZE);
8888
}
8989

9090
HttpFileReader::~HttpFileReader() {
@@ -154,8 +154,8 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
154154
VLOG(2) << "HttpFileReader::read_at_impl offset=" << offset << " size=" << result.size
155155
<< " url=" << _url << " range_supported=" << _range_supported;
156156

157-
if (!_read_buffer) {
158-
_read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
157+
if (_read_buffer.empty()) {
158+
_read_buffer.resize(READ_BUFFER_SIZE);
159159
}
160160

161161
size_t to_read = result.size;
@@ -192,7 +192,7 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
192192
<< "Buffer overflow: buffer_idx=" << buffer_idx << " copy_len=" << copy_len
193193
<< " READ_BUFFER_SIZE=" << READ_BUFFER_SIZE;
194194

195-
std::memcpy(result.data, _read_buffer.get() + buffer_idx, copy_len);
195+
std::memcpy(result.data, _read_buffer.data() + buffer_idx, copy_len);
196196
buffer_offset += copy_len;
197197
to_read -= copy_len;
198198
offset += copy_len;
@@ -319,12 +319,12 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
319319
buffer_offset += buf.size();
320320
} else {
321321
size_t cached = std::min(buf.size(), (size_t)READ_BUFFER_SIZE);
322-
std::memcpy(_read_buffer.get(), buf.data(), cached);
322+
std::memcpy(_read_buffer.data(), buf.data(), cached);
323323
_buffer_start = offset;
324324
_buffer_end = offset + cached;
325325

326326
size_t copy_len = std::min(remaining, cached);
327-
std::memcpy(result.data + buffer_offset, _read_buffer.get(), copy_len);
327+
std::memcpy(result.data + buffer_offset, _read_buffer.data(), copy_len);
328328
buffer_offset += copy_len;
329329
}
330330

be/src/io/fs/http_file_reader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <string>
2424

2525
#include "common/status.h"
26+
#include "core/pod_array.h"
2627
#include "io/fs/file_handle_cache.h"
2728
#include "io/fs/file_reader.h"
2829
#include "io/fs/file_system.h"
@@ -62,7 +63,7 @@ class HttpFileReader final : public FileReader {
6263
// Returns OK on success with _range_supported set appropriately
6364
Status detect_range_support();
6465

65-
std::unique_ptr<char[]> _read_buffer;
66+
PODArray<char> _read_buffer;
6667
static constexpr size_t READ_BUFFER_SIZE = 1 << 20; // 1MB
6768
// Default maximum file size for servers that don't support Range requests
6869
static constexpr size_t DEFAULT_MAX_REQUEST_SIZE = 100 << 20; // 100MB

0 commit comments

Comments
 (0)