Merge branch 'jt/odb-transaction-write' into ps/odb-in-memory

* jt/odb-transaction-write:
  odb/transaction: make `write_object_stream()` pluggable
  object-file: generalize packfile writes to use odb_write_stream
  object-file: avoid fd seekback by checking object size upfront
  object-file: remove flags from transaction packfile writes
  odb: update `struct odb_write_stream` read() callback
  odb/transaction: use pluggable `begin_transaction()`
  odb: split `struct odb_transaction` into separate header
This commit is contained in:
Junio C Hamano
2026-04-09 11:16:58 -07:00
14 changed files with 304 additions and 214 deletions

View File

@@ -1219,6 +1219,7 @@ LIB_OBJS += odb.o
LIB_OBJS += odb/source.o
LIB_OBJS += odb/source-files.o
LIB_OBJS += odb/streaming.o
LIB_OBJS += odb/transaction.o
LIB_OBJS += oid-array.o
LIB_OBJS += oidmap.o
LIB_OBJS += oidset.o

View File

@@ -16,6 +16,7 @@
#include "run-command.h"
#include "object-file.h"
#include "odb.h"
#include "odb/transaction.h"
#include "parse-options.h"
#include "path.h"
#include "preload-index.h"

View File

@@ -9,6 +9,8 @@
#include "hex.h"
#include "object-file.h"
#include "odb.h"
#include "odb/streaming.h"
#include "odb/transaction.h"
#include "object.h"
#include "delta.h"
#include "pack.h"
@@ -359,24 +361,21 @@ static void unpack_non_delta_entry(enum object_type type, unsigned long size,
struct input_zstream_data {
git_zstream *zstream;
unsigned char buf[8192];
int status;
};
static const void *feed_input_zstream(struct odb_write_stream *in_stream,
unsigned long *readlen)
static ssize_t feed_input_zstream(struct odb_write_stream *in_stream,
unsigned char *buf, size_t buf_len)
{
struct input_zstream_data *data = in_stream->data;
git_zstream *zstream = data->zstream;
void *in = fill(1);
if (in_stream->is_finished) {
*readlen = 0;
return NULL;
}
if (in_stream->is_finished)
return 0;
zstream->next_out = data->buf;
zstream->avail_out = sizeof(data->buf);
zstream->next_out = buf;
zstream->avail_out = buf_len;
zstream->next_in = in;
zstream->avail_in = len;
@@ -384,9 +383,7 @@ static const void *feed_input_zstream(struct odb_write_stream *in_stream,
in_stream->is_finished = data->status != Z_OK;
use(len - zstream->avail_in);
*readlen = sizeof(data->buf) - zstream->avail_out;
return data->buf;
return buf_len - zstream->avail_out;
}
static void stream_blob(unsigned long size, unsigned nr)

View File

@@ -19,6 +19,7 @@
#include "tree-walk.h"
#include "object-file.h"
#include "odb.h"
#include "odb/transaction.h"
#include "refs.h"
#include "resolve-undo.h"
#include "parse-options.h"

View File

@@ -10,6 +10,7 @@
#include "cache-tree.h"
#include "object-file.h"
#include "odb.h"
#include "odb/transaction.h"
#include "read-cache-ll.h"
#include "replace-object.h"
#include "repository.h"

View File

@@ -405,6 +405,7 @@ libgit_sources = [
'odb/source.c',
'odb/source-files.c',
'odb/streaming.c',
'odb/transaction.c',
'oid-array.c',
'oidmap.c',
'oidset.c',

View File

@@ -21,6 +21,7 @@
#include "object-file.h"
#include "odb.h"
#include "odb/streaming.h"
#include "odb/transaction.h"
#include "oidtree.h"
#include "pack.h"
#include "packfile.h"
@@ -1068,6 +1069,7 @@ int odb_source_loose_write_stream(struct odb_source *source,
struct git_hash_ctx c, compat_c;
struct strbuf tmp_file = STRBUF_INIT;
struct strbuf filename = STRBUF_INIT;
unsigned char buf[8192];
int dirlen;
char hdr[MAX_HEADER_LEN];
int hdrlen;
@@ -1100,9 +1102,16 @@ int odb_source_loose_write_stream(struct odb_source *source,
unsigned char *in0 = stream.next_in;
if (!stream.avail_in && !in_stream->is_finished) {
const void *in = in_stream->read(in_stream, &stream.avail_in);
stream.next_in = (void *)in;
in0 = (unsigned char *)in;
ssize_t read_len = odb_write_stream_read(in_stream, buf,
sizeof(buf));
if (read_len < 0) {
err = -1;
goto cleanup;
}
stream.avail_in = read_len;
stream.next_in = buf;
in0 = buf;
/* All data has been read. */
if (in_stream->is_finished)
flush = 1;
@@ -1392,11 +1401,10 @@ static int already_written(struct odb_transaction_files *transaction,
}
/* Lazily create backing packfile for the state */
static void prepare_packfile_transaction(struct odb_transaction_files *transaction,
unsigned flags)
static void prepare_packfile_transaction(struct odb_transaction_files *transaction)
{
struct transaction_packfile *state = &transaction->packfile;
if (!(flags & INDEX_WRITE_OBJECT) || state->f)
if (state->f)
return;
state->f = create_tmp_packfile(transaction->base.source->odb->repo,
@@ -1409,33 +1417,53 @@ static void prepare_packfile_transaction(struct odb_transaction_files *transacti
die_errno("unable to write pack header");
}
static int hash_blob_stream(struct odb_write_stream *stream,
const struct git_hash_algo *hash_algo,
struct object_id *result_oid, size_t size)
{
unsigned char buf[16384];
struct git_hash_ctx ctx;
unsigned header_len;
size_t bytes_hashed = 0;
header_len = format_object_header((char *)buf, sizeof(buf),
OBJ_BLOB, size);
hash_algo->init_fn(&ctx);
git_hash_update(&ctx, buf, header_len);
while (!stream->is_finished) {
ssize_t read_result = odb_write_stream_read(stream, buf,
sizeof(buf));
if (read_result < 0)
return -1;
git_hash_update(&ctx, buf, read_result);
bytes_hashed += read_result;
}
if (bytes_hashed != size)
return -1;
git_hash_final_oid(result_oid, &ctx);
return 0;
}
/*
* Read the contents from fd for size bytes, streaming it to the
* packfile in state while updating the hash in ctx. Signal a failure
* by returning a negative value when the resulting pack would exceed
* the pack size limit and this is not the first object in the pack,
* so that the caller can discard what we wrote from the current pack
* by truncating it and opening a new one. The caller will then call
* us again after rewinding the input fd.
*
* The already_hashed_to pointer is kept untouched by the caller to
* make sure we do not hash the same byte when we are called
* again. This way, the caller does not have to checkpoint its hash
* status before calling us just in case we ask it to call us again
* with a new pack.
* Read the contents from the stream provided, streaming it to the
* packfile in state while updating the hash in ctx.
*/
static int stream_blob_to_pack(struct transaction_packfile *state,
struct git_hash_ctx *ctx, off_t *already_hashed_to,
int fd, size_t size, const char *path,
unsigned flags)
static void stream_blob_to_pack(struct transaction_packfile *state,
struct git_hash_ctx *ctx, size_t size,
struct odb_write_stream *stream)
{
git_zstream s;
unsigned char ibuf[16384];
unsigned char obuf[16384];
unsigned hdrlen;
int status = Z_OK;
int write_object = (flags & INDEX_WRITE_OBJECT);
off_t offset = 0;
size_t bytes_read = 0;
git_deflate_init(&s, pack_compression_level);
@@ -1444,45 +1472,27 @@ static int stream_blob_to_pack(struct transaction_packfile *state,
s.avail_out = sizeof(obuf) - hdrlen;
while (status != Z_STREAM_END) {
if (size && !s.avail_in) {
size_t rsize = size < sizeof(ibuf) ? size : sizeof(ibuf);
ssize_t read_result = read_in_full(fd, ibuf, rsize);
if (read_result < 0)
die_errno("failed to read from '%s'", path);
if ((size_t)read_result != rsize)
die("failed to read %u bytes from '%s'",
(unsigned)rsize, path);
offset += rsize;
if (*already_hashed_to < offset) {
size_t hsize = offset - *already_hashed_to;
if (rsize < hsize)
hsize = rsize;
if (hsize)
git_hash_update(ctx, ibuf, hsize);
*already_hashed_to = offset;
}
if (!stream->is_finished && !s.avail_in) {
ssize_t rsize = odb_write_stream_read(stream, ibuf,
sizeof(ibuf));
if (rsize < 0)
die("failed to read blob data");
git_hash_update(ctx, ibuf, rsize);
s.next_in = ibuf;
s.avail_in = rsize;
size -= rsize;
bytes_read += rsize;
}
status = git_deflate(&s, size ? 0 : Z_FINISH);
status = git_deflate(&s, stream->is_finished ? Z_FINISH : 0);
if (!s.avail_out || status == Z_STREAM_END) {
if (write_object) {
size_t written = s.next_out - obuf;
size_t written = s.next_out - obuf;
/* would we bust the size limit? */
if (state->nr_written &&
pack_size_limit_cfg &&
pack_size_limit_cfg < state->offset + written) {
git_deflate_abort(&s);
return -1;
}
hashwrite(state->f, obuf, written);
state->offset += written;
}
hashwrite(state->f, obuf, written);
state->offset += written;
s.next_out = obuf;
s.avail_out = sizeof(obuf);
}
@@ -1496,8 +1506,12 @@ static int stream_blob_to_pack(struct transaction_packfile *state,
die("unexpected deflate failure: %d", status);
}
}
if (bytes_read != size)
die("read %" PRIuMAX " bytes of blob data, but expected %" PRIuMAX " bytes",
(uintmax_t)bytes_read, (uintmax_t)size);
git_deflate_end(&s);
return 0;
}
static void flush_packfile_transaction(struct odb_transaction_files *transaction)
@@ -1568,64 +1582,49 @@ clear_exit:
* binary blobs, they generally do not want to get any conversion, and
* callers should avoid this code path when filters are requested.
*/
static int index_blob_packfile_transaction(struct odb_transaction_files *transaction,
struct object_id *result_oid, int fd,
size_t size, const char *path,
unsigned flags)
static int odb_transaction_files_write_object_stream(struct odb_transaction *base,
struct odb_write_stream *stream,
size_t size,
struct object_id *result_oid)
{
struct odb_transaction_files *transaction = container_of(base,
struct odb_transaction_files,
base);
struct transaction_packfile *state = &transaction->packfile;
off_t seekback, already_hashed_to;
struct git_hash_ctx ctx;
unsigned char obuf[16384];
unsigned header_len;
struct hashfile_checkpoint checkpoint;
struct pack_idx_entry *idx = NULL;
seekback = lseek(fd, 0, SEEK_CUR);
if (seekback == (off_t)-1)
return error("cannot find the current offset");
struct pack_idx_entry *idx;
header_len = format_object_header((char *)obuf, sizeof(obuf),
OBJ_BLOB, size);
transaction->base.source->odb->repo->hash_algo->init_fn(&ctx);
git_hash_update(&ctx, obuf, header_len);
/* Note: idx is non-NULL when we are writing */
if ((flags & INDEX_WRITE_OBJECT) != 0) {
CALLOC_ARRAY(idx, 1);
prepare_packfile_transaction(transaction, flags);
hashfile_checkpoint_init(state->f, &checkpoint);
}
already_hashed_to = 0;
while (1) {
prepare_packfile_transaction(transaction, flags);
if (idx) {
hashfile_checkpoint(state->f, &checkpoint);
idx->offset = state->offset;
crc32_begin(state->f);
}
if (!stream_blob_to_pack(state, &ctx, &already_hashed_to,
fd, size, path, flags))
break;
/*
* Writing this object to the current pack will make
* it too big; we need to truncate it, start a new
* pack, and write into it.
*/
if (!idx)
BUG("should not happen");
hashfile_truncate(state->f, &checkpoint);
state->offset = checkpoint.offset;
/*
* If writing another object to the packfile could result in it
* exceeding the configured size limit, flush the current packfile
* transaction.
*
* Note that this uses the inflated object size as an approximation.
* Blob objects written in this manner are not delta-compressed, so
* the difference between the inflated and on-disk size is limited
* to zlib compression and is sufficient for this check.
*/
if (state->nr_written && pack_size_limit_cfg &&
pack_size_limit_cfg < state->offset + size)
flush_packfile_transaction(transaction);
if (lseek(fd, seekback, SEEK_SET) == (off_t)-1)
return error("cannot seek back");
}
CALLOC_ARRAY(idx, 1);
prepare_packfile_transaction(transaction);
hashfile_checkpoint_init(state->f, &checkpoint);
hashfile_checkpoint(state->f, &checkpoint);
idx->offset = state->offset;
crc32_begin(state->f);
stream_blob_to_pack(state, &ctx, size, stream);
git_hash_final_oid(result_oid, &ctx);
if (!idx)
return 0;
idx->crc32 = crc32_end(state->f);
if (already_written(transaction, result_oid)) {
@@ -1642,34 +1641,6 @@ static int index_blob_packfile_transaction(struct odb_transaction_files *transac
return 0;
}
static int hash_blob_stream(const struct git_hash_algo *hash_algo,
struct object_id *result_oid, int fd, size_t size)
{
unsigned char buf[16384];
struct git_hash_ctx ctx;
unsigned header_len;
header_len = format_object_header((char *)buf, sizeof(buf),
OBJ_BLOB, size);
hash_algo->init_fn(&ctx);
git_hash_update(&ctx, buf, header_len);
while (size) {
size_t rsize = size < sizeof(buf) ? size : sizeof(buf);
ssize_t read_result = read_in_full(fd, buf, rsize);
if ((read_result < 0) || ((size_t)read_result != rsize))
return -1;
git_hash_update(&ctx, buf, rsize);
size -= read_result;
}
git_hash_final_oid(result_oid, &ctx);
return 0;
}
int index_fd(struct index_state *istate, struct object_id *oid,
int fd, struct stat *st,
enum object_type type, const char *path, unsigned flags)
@@ -1691,23 +1662,25 @@ int index_fd(struct index_state *istate, struct object_id *oid,
ret = index_core(istate, oid, fd, xsize_t(st->st_size),
type, path, flags);
} else {
struct odb_write_stream stream;
odb_write_stream_from_fd(&stream, fd, xsize_t(st->st_size));
if (flags & INDEX_WRITE_OBJECT) {
struct object_database *odb = the_repository->objects;
struct odb_transaction_files *files_transaction;
struct odb_transaction *transaction;
struct odb_transaction *transaction = odb_transaction_begin(odb);
transaction = odb_transaction_begin(odb);
files_transaction = container_of(odb->transaction,
struct odb_transaction_files,
base);
ret = index_blob_packfile_transaction(files_transaction, oid, fd,
xsize_t(st->st_size),
path, flags);
ret = odb_transaction_write_object_stream(odb->transaction,
&stream,
xsize_t(st->st_size),
oid);
odb_transaction_commit(transaction);
} else {
ret = hash_blob_stream(the_repository->hash_algo, oid,
fd, xsize_t(st->st_size));
ret = hash_blob_stream(&stream,
the_repository->hash_algo, oid,
xsize_t(st->st_size));
}
odb_write_stream_release(&stream);
}
close(fd);
@@ -2225,6 +2198,7 @@ struct odb_transaction *odb_transaction_files_begin(struct odb_source *source)
transaction = xcalloc(1, sizeof(*transaction));
transaction->base.source = source;
transaction->base.commit = odb_transaction_files_commit;
transaction->base.write_object_stream = odb_transaction_files_write_object_stream;
return &transaction->base;
}

25
odb.c
View File

@@ -1154,28 +1154,3 @@ void odb_reprepare(struct object_database *o)
obj_read_unlock();
}
struct odb_transaction *odb_transaction_begin(struct object_database *odb)
{
if (odb->transaction)
return NULL;
odb->transaction = odb_transaction_files_begin(odb->sources);
return odb->transaction;
}
void odb_transaction_commit(struct odb_transaction *transaction)
{
if (!transaction)
return;
/*
* Ensure the transaction ending matches the pending transaction.
*/
ASSERT(transaction == transaction->source->odb->transaction);
transaction->commit(transaction);
transaction->source->odb->transaction = NULL;
free(transaction);
}

37
odb.h
View File

@@ -29,24 +29,6 @@ extern int fetch_if_missing;
*/
char *compute_alternate_path(const char *path, struct strbuf *err);
/*
* A transaction may be started for an object database prior to writing new
* objects via odb_transaction_begin(). These objects are not committed until
* odb_transaction_commit() is invoked. Only a single transaction may be pending
* at a time.
*
* Each ODB source is expected to implement its own transaction handling.
*/
struct odb_transaction;
typedef void (*odb_transaction_commit_fn)(struct odb_transaction *transaction);
struct odb_transaction {
/* The ODB source the transaction is opened against. */
struct odb_source *source;
/* The ODB source specific callback invoked to commit a transaction. */
odb_transaction_commit_fn commit;
};
/*
* The object database encapsulates access to objects in a repository. It
* manages one or more sources that store the actual objects which are
@@ -148,19 +130,6 @@ void odb_close(struct object_database *o);
*/
void odb_reprepare(struct object_database *o);
/*
* Starts an ODB transaction. Subsequent objects are written to the transaction
* and not committed until odb_transaction_commit() is invoked on the
* transaction. If the ODB already has a pending transaction, NULL is returned.
*/
struct odb_transaction *odb_transaction_begin(struct object_database *odb);
/*
* Commits an ODB transaction making the written objects visible. If the
* specified transaction is NULL, the function is a no-op.
*/
void odb_transaction_commit(struct odb_transaction *transaction);
/*
* Find source by its object directory path. Returns a `NULL` pointer in case
* the source could not be found.
@@ -593,11 +562,7 @@ static inline int odb_write_object(struct object_database *odb,
return odb_write_object_ext(odb, buf, len, type, oid, NULL, 0);
}
struct odb_write_stream {
const void *(*read)(struct odb_write_stream *, unsigned long *len);
void *data;
int is_finished;
};
struct odb_write_stream;
int odb_write_object_stream(struct object_database *odb,
struct odb_write_stream *stream, size_t len,

View File

@@ -232,6 +232,16 @@ struct odb_read_stream *odb_read_stream_open(struct object_database *odb,
return st;
}
ssize_t odb_write_stream_read(struct odb_write_stream *st, void *buf, size_t sz)
{
return st->read(st, buf, sz);
}
void odb_write_stream_release(struct odb_write_stream *st)
{
free(st->data);
}
int odb_stream_blob_to_fd(struct object_database *odb,
int fd,
const struct object_id *oid,
@@ -287,3 +297,44 @@ int odb_stream_blob_to_fd(struct object_database *odb,
odb_read_stream_close(st);
return result;
}
struct read_object_fd_data {
int fd;
size_t remaining;
};
static ssize_t read_object_fd(struct odb_write_stream *stream,
unsigned char *buf, size_t len)
{
struct read_object_fd_data *data = stream->data;
ssize_t read_result;
size_t count;
if (stream->is_finished)
return 0;
count = data->remaining < len ? data->remaining : len;
read_result = read_in_full(data->fd, buf, count);
if (read_result < 0 || (size_t)read_result != count)
return -1;
data->remaining -= count;
if (!data->remaining)
stream->is_finished = 1;
return read_result;
}
void odb_write_stream_from_fd(struct odb_write_stream *stream, int fd,
size_t size)
{
struct read_object_fd_data *data;
CALLOC_ARRAY(data, 1);
data->fd = fd;
data->remaining = size;
stream->data = data;
stream->read = read_object_fd;
stream->is_finished = 0;
}

View File

@@ -5,6 +5,7 @@
#define STREAMING_H 1
#include "object.h"
#include "odb.h"
struct object_database;
struct odb_read_stream;
@@ -47,6 +48,29 @@ int odb_read_stream_close(struct odb_read_stream *stream);
*/
ssize_t odb_read_stream_read(struct odb_read_stream *stream, void *buf, size_t len);
/*
* A stream that provides an object to be written to the object database without
* loading all of it into memory.
*/
struct odb_write_stream {
ssize_t (*read)(struct odb_write_stream *, unsigned char *, size_t);
void *data;
int is_finished;
};
/*
* Read data from the stream into the buffer. Returns 0 when finished and the
* number of bytes read on success. Returns a negative error code in case
* reading from the stream fails.
*/
ssize_t odb_write_stream_read(struct odb_write_stream *stream, void *buf,
size_t len);
/*
* Releases memory allocated for underlying stream data.
*/
void odb_write_stream_release(struct odb_write_stream *stream);
/*
* Look up the object by its ID and write the full contents to the file
* descriptor. The object must be a blob, or the function will fail. When
@@ -64,4 +88,10 @@ int odb_stream_blob_to_fd(struct object_database *odb,
struct stream_filter *filter,
int can_seek);
/*
* Sets up an ODB write stream that reads from an fd.
*/
void odb_write_stream_from_fd(struct odb_write_stream *stream, int fd,
size_t size);
#endif /* STREAMING_H */

35
odb/transaction.c Normal file
View File

@@ -0,0 +1,35 @@
#include "git-compat-util.h"
#include "odb/source.h"
#include "odb/transaction.h"
struct odb_transaction *odb_transaction_begin(struct object_database *odb)
{
if (odb->transaction)
return NULL;
odb_source_begin_transaction(odb->sources, &odb->transaction);
return odb->transaction;
}
void odb_transaction_commit(struct odb_transaction *transaction)
{
if (!transaction)
return;
/*
* Ensure the transaction ending matches the pending transaction.
*/
ASSERT(transaction == transaction->source->odb->transaction);
transaction->commit(transaction);
transaction->source->odb->transaction = NULL;
free(transaction);
}
int odb_transaction_write_object_stream(struct odb_transaction *transaction,
struct odb_write_stream *stream,
size_t len, struct object_id *oid)
{
return transaction->write_object_stream(transaction, stream, len, oid);
}

57
odb/transaction.h Normal file
View File

@@ -0,0 +1,57 @@
#ifndef ODB_TRANSACTION_H
#define ODB_TRANSACTION_H
#include "odb.h"
#include "odb/source.h"
/*
* A transaction may be started for an object database prior to writing new
* objects via odb_transaction_begin(). These objects are not committed until
* odb_transaction_commit() is invoked. Only a single transaction may be pending
* at a time.
*
* Each ODB source is expected to implement its own transaction handling.
*/
struct odb_transaction {
/* The ODB source the transaction is opened against. */
struct odb_source *source;
/* The ODB source specific callback invoked to commit a transaction. */
void (*commit)(struct odb_transaction *transaction);
/*
* This callback is expected to write the given object stream into
* the ODB transaction. Note that for now, only blobs support streaming.
*
* The resulting object ID shall be written into the out pointer. The
* callback is expected to return 0 on success, a negative error code
* otherwise.
*/
int (*write_object_stream)(struct odb_transaction *transaction,
struct odb_write_stream *stream, size_t len,
struct object_id *oid);
};
/*
* Starts an ODB transaction. Subsequent objects are written to the transaction
* and not committed until odb_transaction_commit() is invoked on the
* transaction. If the ODB already has a pending transaction, NULL is returned.
*/
struct odb_transaction *odb_transaction_begin(struct object_database *odb);
/*
* Commits an ODB transaction making the written objects visible. If the
* specified transaction is NULL, the function is a no-op.
*/
void odb_transaction_commit(struct odb_transaction *transaction);
/*
* Writes the object in the provided stream into the transaction. The resulting
* object ID is written into the out pointer. Returns 0 on success, a negative
* error code otherwise.
*/
int odb_transaction_write_object_stream(struct odb_transaction *transaction,
struct odb_write_stream *stream,
size_t len, struct object_id *oid);
#endif

View File

@@ -20,6 +20,7 @@
#include "dir.h"
#include "object-file.h"
#include "odb.h"
#include "odb/transaction.h"
#include "oid-array.h"
#include "tree.h"
#include "commit.h"