fsmonitor--daemon: implement handle_client callback

Teach fsmonitor--daemon to respond to IPC requests from client
Git processes and respond with a list of modified pathnames
relative to the provided token.

Signed-off-by: Jeff Hostetler <jeffhost@microsoft.com>
This commit is contained in:
Jeff Hostetler
2020-12-18 11:43:16 -05:00
committed by Johannes Schindelin
parent 4d728d19c2
commit bd9d11cf3a

View File

@@ -7,6 +7,7 @@
#include "fsmonitor--daemon.h"
#include "simple-ipc.h"
#include "khash.h"
#include "pkt-line.h"
static const char * const builtin_fsmonitor__daemon_usage[] = {
N_("git fsmonitor--daemon --start [<options>]"),
@@ -369,19 +370,349 @@ void fsmonitor_force_resync(struct fsmonitor_daemon_state *state)
fsmonitor_free_token_data(free_me);
}
/*
* Format an opaque token string to send to the client.
*/
static void fsmonitor_format_response_token(
struct strbuf *response_token,
const struct strbuf *response_token_id,
const struct fsmonitor_batch *batch)
{
uint64_t seq_nr = (batch) ? batch->batch_seq_nr + 1 : 0;
strbuf_reset(response_token);
strbuf_addf(response_token, "builtin:%s:%"PRIu64,
response_token_id->buf, seq_nr);
}
/*
* Parse an opaque token from the client.
*/
static int fsmonitor_parse_client_token(const char *buf_token,
struct strbuf *requested_token_id,
uint64_t *seq_nr)
{
const char *p;
char *p_end;
strbuf_reset(requested_token_id);
*seq_nr = 0;
if (!skip_prefix(buf_token, "builtin:", &p))
return 1;
while (*p && *p != ':')
strbuf_addch(requested_token_id, *p++);
if (!*p++)
return 1;
*seq_nr = (uint64_t)strtoumax(p, &p_end, 10);
if (*p_end)
return 1;
return 0;
}
KHASH_INIT(str, const char *, int, 0, kh_str_hash_func, kh_str_hash_equal);
static int do_handle_client(struct fsmonitor_daemon_state *state,
const char *command,
ipc_server_reply_cb *reply,
struct ipc_server_reply_data *reply_data)
{
struct fsmonitor_token_data *token_data = NULL;
struct strbuf response_token = STRBUF_INIT;
struct strbuf requested_token_id = STRBUF_INIT;
struct strbuf payload = STRBUF_INIT;
uint64_t requested_oldest_seq_nr = 0;
uint64_t total_response_len = 0;
const char *p;
const struct fsmonitor_batch *batch_head;
const struct fsmonitor_batch *batch;
intmax_t count = 0, duplicates = 0;
kh_str_t *shown;
int hash_ret;
int result;
/*
* We expect `command` to be of the form:
*
* <command> := quit NUL
* | flush NUL
* | <V1-time-since-epoch-ns> NUL
* | <V2-opaque-fsmonitor-token> NUL
*/
if (!strcmp(command, "quit")) {
/*
* A client has requested over the socket/pipe that the
* daemon shutdown.
*
* Tell the IPC thread pool to shutdown (which completes
* the await in the main thread (which can stop the
* fsmonitor listener thread)).
*
* There is no reply to the client.
*/
return SIMPLE_IPC_QUIT;
}
if (!strcmp(command, "flush")) {
/*
* Flush all of our cached data and generate a new token
* just like if we lost sync with the filesystem.
*
* Then send a trivial response using the new token.
*/
fsmonitor_force_resync(state);
result = 0;
goto send_trivial_response;
}
if (!skip_prefix(command, "builtin:", &p)) {
/* assume V1 timestamp or garbage */
char *p_end;
strtoumax(command, &p_end, 10);
trace_printf_key(&trace_fsmonitor,
((*p_end) ?
"fsmonitor: invalid command line '%s'" :
"fsmonitor: unsupported V1 protocol '%s'"),
command);
result = -1;
goto send_trivial_response;
}
/* try V2 token */
if (fsmonitor_parse_client_token(command, &requested_token_id,
&requested_oldest_seq_nr)) {
trace_printf_key(&trace_fsmonitor,
"fsmonitor: invalid V2 protocol token '%s'",
command);
result = -1;
goto send_trivial_response;
}
pthread_mutex_lock(&state->main_lock);
if (!state->current_token_data) {
/*
* We don't have a current token. This may mean that
* the listener thread has not yet started.
*/
pthread_mutex_unlock(&state->main_lock);
result = 0;
goto send_trivial_response;
}
if (strcmp(requested_token_id.buf,
state->current_token_data->token_id.buf)) {
/*
* The client last spoke to a different daemon
* instance -OR- the daemon had to resync with
* the filesystem (and lost events), so reject.
*/
pthread_mutex_unlock(&state->main_lock);
result = 0;
trace2_data_string("fsmonitor", the_repository,
"response/token", "different");
goto send_trivial_response;
}
if (!state->current_token_data->batch_tail) {
/*
* The listener has not received any filesystem
* events yet since we created the current token.
* We can respond with an empty list, since the
* client has already seen the current token and
* we have nothing new to report. (This is
* instead of sending a trivial response.)
*/
pthread_mutex_unlock(&state->main_lock);
result = 0;
goto send_empty_response;
}
if (requested_oldest_seq_nr <
state->current_token_data->batch_tail->batch_seq_nr) {
/*
* The client wants older events than we have for
* this token_id. This means that the end of our
* batch list was truncated and we cannot give the
* client a complete snapshot relative to their
* request.
*/
pthread_mutex_unlock(&state->main_lock);
trace_printf_key(&trace_fsmonitor,
"client requested truncated data");
result = 0;
goto send_trivial_response;
}
/*
* We're going to hold onto a pointer to the current
* token-data while we walk the list of batches of files.
* During this time, we will NOT be under the lock.
* So we ref-count it.
*
* This allows the listener thread to continue prepending
* new batches of items to the token-data (which we'll ignore).
*
* AND it allows the listener thread to do a token-reset
* (and install a new `current_token_data`).
*
* We mark the current head of the batch list as "pinned" so
* that the listener thread will treat this item as read-only
* (and prevent any more paths from being added to it) from
* now on.
*/
token_data = state->current_token_data;
token_data->client_ref_count++;
batch_head = token_data->batch_head;
((struct fsmonitor_batch *)batch_head)->pinned_time = time(NULL);
pthread_mutex_unlock(&state->main_lock);
/*
* FSMonitor Protocol V2 requires that we send a response header
* with a "new current token" and then all of the paths that changed
* since the "requested token".
*/
fsmonitor_format_response_token(&response_token,
&token_data->token_id,
batch_head);
reply(reply_data, response_token.buf, response_token.len + 1);
total_response_len += response_token.len + 1;
trace2_data_string("fsmonitor", the_repository, "response/token",
response_token.buf);
trace_printf_key(&trace_fsmonitor, "response token: %s", response_token.buf);
shown = kh_init_str();
for (batch = batch_head;
batch && batch->batch_seq_nr >= requested_oldest_seq_nr;
batch = batch->next) {
size_t k;
for (k = 0; k < batch->nr; k++) {
const char *s = batch->interned_paths[k];
size_t s_len;
if (kh_get_str(shown, s) != kh_end(shown))
duplicates++;
else {
kh_put_str(shown, s, &hash_ret);
trace_printf_key(&trace_fsmonitor,
"send[%"PRIuMAX"]: %s",
count, s);
/* Each path gets written with a trailing NUL */
s_len = strlen(s) + 1;
if (payload.len + s_len >=
LARGE_PACKET_DATA_MAX) {
reply(reply_data, payload.buf,
payload.len);
total_response_len += payload.len;
strbuf_reset(&payload);
}
strbuf_add(&payload, s, s_len);
count++;
}
}
}
if (payload.len) {
reply(reply_data, payload.buf, payload.len);
total_response_len += payload.len;
}
kh_release_str(shown);
pthread_mutex_lock(&state->main_lock);
if (token_data->client_ref_count > 0)
token_data->client_ref_count--;
if (token_data->client_ref_count == 0) {
if (token_data != state->current_token_data) {
/*
* The listener thread did a token-reset while we were
* walking the batch list. Therefore, this token is
* stale and can be discarded completely. If we are
* the last reader thread using this token, we own
* that work.
*/
fsmonitor_free_token_data(token_data);
}
}
pthread_mutex_unlock(&state->main_lock);
trace2_data_intmax("fsmonitor", the_repository, "response/length", total_response_len);
trace2_data_intmax("fsmonitor", the_repository, "response/count/files", count);
trace2_data_intmax("fsmonitor", the_repository, "response/count/duplicates", duplicates);
strbuf_release(&response_token);
strbuf_release(&requested_token_id);
strbuf_release(&payload);
return 0;
send_trivial_response:
pthread_mutex_lock(&state->main_lock);
fsmonitor_format_response_token(&response_token,
&state->current_token_data->token_id,
state->current_token_data->batch_head);
pthread_mutex_unlock(&state->main_lock);
reply(reply_data, response_token.buf, response_token.len + 1);
trace2_data_string("fsmonitor", the_repository, "response/token",
response_token.buf);
reply(reply_data, "/", 2);
trace2_data_intmax("fsmonitor", the_repository, "response/trivial", 1);
strbuf_release(&response_token);
strbuf_release(&requested_token_id);
return result;
send_empty_response:
pthread_mutex_lock(&state->main_lock);
fsmonitor_format_response_token(&response_token,
&state->current_token_data->token_id,
NULL);
pthread_mutex_unlock(&state->main_lock);
reply(reply_data, response_token.buf, response_token.len + 1);
trace2_data_string("fsmonitor", the_repository, "response/token",
response_token.buf);
trace2_data_intmax("fsmonitor", the_repository, "response/empty", 1);
strbuf_release(&response_token);
strbuf_release(&requested_token_id);
return 0;
}
static ipc_server_application_cb handle_client;
static int handle_client(void *data, const char *command,
ipc_server_reply_cb *reply,
struct ipc_server_reply_data *reply_data)
{
/* struct fsmonitor_daemon_state *state = data; */
struct fsmonitor_daemon_state *state = data;
int result;
trace_printf_key(&trace_fsmonitor, "requested token: %s", command);
trace2_region_enter("fsmonitor", "handle_client", the_repository);
trace2_data_string("fsmonitor", the_repository, "request", command);
result = 0; /* TODO Do something here. */
result = do_handle_client(state, command, reply, reply_data);
trace2_region_leave("fsmonitor", "handle_client", the_repository);