0
0
mirror of https://github.com/vim/vim.git synced 2025-09-30 04:44:14 -04:00

patch 8.0.0957: a terminal job can deadlock when sending many keys

Problem:    When term_sendkeys() sends many keys it may get stuck in writing
            to the job.
Solution:   Make the write non-blocking, buffer keys to be sent.
This commit is contained in:
Bram Moolenaar
2017-08-18 20:50:30 +02:00
parent cfce71710b
commit 97bd5e6527
6 changed files with 217 additions and 39 deletions

View File

@@ -1373,7 +1373,7 @@ can_write_buf_line(channel_T *channel)
} }
/* /*
* Write any lines to the input channel. * Write any buffer lines to the input channel.
*/ */
static void static void
channel_write_in(channel_T *channel) channel_write_in(channel_T *channel)
@@ -1445,6 +1445,25 @@ channel_buffer_free(buf_T *buf)
} }
} }
/*
* Write any lines waiting to be written to "channel".
*/
static void
channel_write_input(channel_T *channel)
{
chanpart_T *in_part = &channel->ch_part[PART_IN];
if (in_part->ch_writeque.wq_next != NULL)
channel_send(channel, PART_IN, (char_u *)"", 0, "channel_write_input");
else if (in_part->ch_bufref.br_buf != NULL)
{
if (in_part->ch_buf_append)
channel_write_new_lines(in_part->ch_bufref.br_buf);
else
channel_write_in(channel);
}
}
/* /*
* Write any lines waiting to be written to a channel. * Write any lines waiting to be written to a channel.
*/ */
@@ -1454,17 +1473,7 @@ channel_write_any_lines(void)
channel_T *channel; channel_T *channel;
for (channel = first_channel; channel != NULL; channel = channel->ch_next) for (channel = first_channel; channel != NULL; channel = channel->ch_next)
{ channel_write_input(channel);
chanpart_T *in_part = &channel->ch_part[PART_IN];
if (in_part->ch_bufref.br_buf != NULL)
{
if (in_part->ch_buf_append)
channel_write_new_lines(in_part->ch_bufref.br_buf);
else
channel_write_in(channel);
}
}
} }
/* /*
@@ -2984,7 +2993,9 @@ channel_fill_wfds(int maxfd_arg, fd_set *wfds)
{ {
chanpart_T *in_part = &ch->ch_part[PART_IN]; chanpart_T *in_part = &ch->ch_part[PART_IN];
if (in_part->ch_fd != INVALID_FD && in_part->ch_bufref.br_buf != NULL) if (in_part->ch_fd != INVALID_FD
&& (in_part->ch_bufref.br_buf != NULL
|| in_part->ch_writeque.wq_next != NULL))
{ {
FD_SET((int)in_part->ch_fd, wfds); FD_SET((int)in_part->ch_fd, wfds);
if ((int)in_part->ch_fd >= maxfd) if ((int)in_part->ch_fd >= maxfd)
@@ -3529,6 +3540,31 @@ channel_handle_events(void)
} }
# endif # endif
/*
* Set "channel"/"part" to non-blocking.
*/
void
channel_set_nonblock(channel_T *channel, ch_part_T part)
{
chanpart_T *ch_part = &channel->ch_part[part];
int fd = ch_part->ch_fd;
if (fd != INVALID_FD)
{
#ifdef _WIN32
if (part == PART_SOCK)
{
u_long val = 1;
ioctlsocket(fd, FIONBIO, &val);
}
else
#endif
fcntl(fd, F_SETFL, O_NONBLOCK);
ch_part->ch_nonblocking = TRUE;
}
}
/* /*
* Write "buf" (NUL terminated string) to "channel"/"part". * Write "buf" (NUL terminated string) to "channel"/"part".
* When "fun" is not NULL an error message might be given. * When "fun" is not NULL an error message might be given.
@@ -3538,14 +3574,16 @@ channel_handle_events(void)
channel_send( channel_send(
channel_T *channel, channel_T *channel,
ch_part_T part, ch_part_T part,
char_u *buf, char_u *buf_arg,
int len, int len_arg,
char *fun) char *fun)
{ {
int res; int res;
sock_T fd; sock_T fd;
chanpart_T *ch_part = &channel->ch_part[part];
int did_use_queue = FALSE;
fd = channel->ch_part[part].ch_fd; fd = ch_part->ch_fd;
if (fd == INVALID_FD) if (fd == INVALID_FD)
{ {
if (!channel->ch_error && fun != NULL) if (!channel->ch_error && fun != NULL)
@@ -3561,29 +3599,145 @@ channel_send(
{ {
ch_log_lead("SEND ", channel); ch_log_lead("SEND ", channel);
fprintf(log_fd, "'"); fprintf(log_fd, "'");
ignored = (int)fwrite(buf, len, 1, log_fd); ignored = (int)fwrite(buf_arg, len_arg, 1, log_fd);
fprintf(log_fd, "'\n"); fprintf(log_fd, "'\n");
fflush(log_fd); fflush(log_fd);
did_log_msg = TRUE; did_log_msg = TRUE;
} }
if (part == PART_SOCK) for (;;)
res = sock_write(fd, (char *)buf, len);
else
res = fd_write(fd, (char *)buf, len);
if (res != len)
{ {
if (!channel->ch_error && fun != NULL) writeq_T *wq = &ch_part->ch_writeque;
{ char_u *buf;
ch_error(channel, "%s(): write failed", fun); int len;
EMSG2(_("E631: %s(): write failed"), fun);
}
channel->ch_error = TRUE;
return FAIL;
}
channel->ch_error = FALSE; if (wq->wq_next != NULL)
return OK; {
/* first write what was queued */
buf = wq->wq_next->wq_ga.ga_data;
len = wq->wq_next->wq_ga.ga_len;
did_use_queue = TRUE;
}
else
{
if (len_arg == 0)
/* nothing to write, called from channel_select_check() */
return OK;
buf = buf_arg;
len = len_arg;
}
if (part == PART_SOCK)
res = sock_write(fd, (char *)buf, len);
else
res = fd_write(fd, (char *)buf, len);
if (res < 0 && (errno == EWOULDBLOCK
#ifdef EAGAIN
|| errno == EAGAIN
#endif
))
res = 0; /* nothing got written */
if (res >= 0 && ch_part->ch_nonblocking)
{
writeq_T *entry = wq->wq_next;
if (did_use_queue)
ch_log(channel, "Sent %d bytes now", res);
if (res == len)
{
/* Wrote all the buf[len] bytes. */
if (entry != NULL)
{
/* Remove the entry from the write queue. */
ga_clear(&entry->wq_ga);
wq->wq_next = entry->wq_next;
if (wq->wq_next == NULL)
wq->wq_prev = NULL;
else
wq->wq_next->wq_prev = NULL;
continue;
}
if (did_use_queue)
ch_log(channel, "Write queue empty");
}
else
{
/* Wrote only buf[res] bytes, can't write more now. */
if (entry != NULL)
{
if (res > 0)
{
/* Remove the bytes that were written. */
mch_memmove(entry->wq_ga.ga_data,
(char *)entry->wq_ga.ga_data + res,
len - res);
entry->wq_ga.ga_len -= res;
}
buf = buf_arg;
len = len_arg;
}
else
{
buf += res;
len -= res;
}
ch_log(channel, "Adding %d bytes to the write queue", len);
/* Append the not written bytes of the argument to the write
* buffer. Limit entries to 4000 bytes. */
if (wq->wq_prev != NULL
&& wq->wq_prev->wq_ga.ga_len + len < 4000)
{
writeq_T *last = wq->wq_prev;
/* append to the last entry */
if (ga_grow(&last->wq_ga, len) == OK)
{
mch_memmove((char *)last->wq_ga.ga_data
+ last->wq_ga.ga_len,
buf, len);
last->wq_ga.ga_len += len;
}
}
else
{
writeq_T *last = (writeq_T *)alloc((int)sizeof(writeq_T));
if (last != NULL)
{
ch_log(channel, "Creating new entry");
last->wq_prev = wq->wq_prev;
last->wq_next = NULL;
if (wq->wq_prev == NULL)
wq->wq_next = last;
else
wq->wq_prev->wq_next = last;
wq->wq_prev = last;
ga_init2(&last->wq_ga, 1, 1000);
if (ga_grow(&last->wq_ga, len) == OK)
{
mch_memmove(last->wq_ga.ga_data, buf, len);
last->wq_ga.ga_len = len;
}
}
}
}
}
else if (res != len)
{
if (!channel->ch_error && fun != NULL)
{
ch_error(channel, "%s(): write failed", fun);
EMSG2(_("E631: %s(): write failed"), fun);
}
channel->ch_error = TRUE;
return FAIL;
}
channel->ch_error = FALSE;
return OK;
}
} }
/* /*
@@ -3873,13 +4027,7 @@ channel_select_check(int ret_in, void *rfds_in, void *wfds_in)
if (ret > 0 && in_part->ch_fd != INVALID_FD if (ret > 0 && in_part->ch_fd != INVALID_FD
&& FD_ISSET(in_part->ch_fd, wfds)) && FD_ISSET(in_part->ch_fd, wfds))
{ {
if (in_part->ch_buf_append) channel_write_input(channel);
{
if (in_part->ch_bufref.br_buf != NULL)
channel_write_new_lines(in_part->ch_bufref.br_buf);
}
else
channel_write_in(channel);
--ret; --ret;
} }
} }

View File

@@ -35,6 +35,7 @@ char_u *channel_read_block(channel_T *channel, ch_part_T part, int timeout);
void common_channel_read(typval_T *argvars, typval_T *rettv, int raw); void common_channel_read(typval_T *argvars, typval_T *rettv, int raw);
channel_T *channel_fd2channel(sock_T fd, ch_part_T *partp); channel_T *channel_fd2channel(sock_T fd, ch_part_T *partp);
void channel_handle_events(void); void channel_handle_events(void);
void channel_set_nonblock(channel_T *channel, ch_part_T part);
int channel_send(channel_T *channel, ch_part_T part, char_u *buf, int len, char *fun); int channel_send(channel_T *channel, ch_part_T part, char_u *buf, int len, char *fun);
void ch_expr_common(typval_T *argvars, typval_T *rettv, int eval); void ch_expr_common(typval_T *argvars, typval_T *rettv, int eval);
void ch_raw_common(typval_T *argvars, typval_T *rettv, int eval); void ch_raw_common(typval_T *argvars, typval_T *rettv, int eval);

View File

@@ -1196,6 +1196,7 @@ typedef struct partial_S partial_T;
typedef struct jobvar_S job_T; typedef struct jobvar_S job_T;
typedef struct readq_S readq_T; typedef struct readq_S readq_T;
typedef struct writeq_S writeq_T;
typedef struct jsonq_S jsonq_T; typedef struct jsonq_S jsonq_T;
typedef struct cbq_S cbq_T; typedef struct cbq_S cbq_T;
typedef struct channel_S channel_T; typedef struct channel_S channel_T;
@@ -1512,6 +1513,13 @@ struct readq_S
readq_T *rq_prev; readq_T *rq_prev;
}; };
struct writeq_S
{
garray_T wq_ga;
writeq_T *wq_next;
writeq_T *wq_prev;
};
struct jsonq_S struct jsonq_S
{ {
typval_T *jq_value; typval_T *jq_value;
@@ -1601,6 +1609,8 @@ typedef struct {
#endif #endif
int ch_block_write; /* for testing: 0 when not used, -1 when write int ch_block_write; /* for testing: 0 when not used, -1 when write
* does not block, 1 simulate blocking */ * does not block, 1 simulate blocking */
int ch_nonblocking; /* write() is non-blocking */
writeq_T ch_writeque; /* header for write queue */
cbq_T ch_cb_head; /* dummy node for per-request callbacks */ cbq_T ch_cb_head; /* dummy node for per-request callbacks */
char_u *ch_callback; /* call when a msg is not handled */ char_u *ch_callback; /* call when a msg is not handled */

View File

@@ -400,6 +400,10 @@ term_start(typval_T *argvar, jobopt_T *opt, int forceit)
vterm_get_size(term->tl_vterm, &term->tl_rows, &term->tl_cols); vterm_get_size(term->tl_vterm, &term->tl_rows, &term->tl_cols);
term_report_winsize(term, term->tl_rows, term->tl_cols); term_report_winsize(term, term->tl_rows, term->tl_cols);
/* Make sure we don't get stuck on sending keys to the job, it leads to
* a deadlock if the job is waiting for Vim to read. */
channel_set_nonblock(term->tl_job->jv_channel, PART_IN);
if (old_curbuf != NULL) if (old_curbuf != NULL)
{ {
--curbuf->b_nwindows; --curbuf->b_nwindows;

View File

@@ -450,3 +450,16 @@ func Test_terminal_list_args()
exe buf . 'bwipe!' exe buf . 'bwipe!'
call assert_equal("", bufname(buf)) call assert_equal("", bufname(buf))
endfunction endfunction
func Test_terminal_noblock()
let buf = term_start(&shell)
for c in ['a','b','c','d','e','f','g','h','i','j','k']
call term_sendkeys(buf, 'echo ' . repeat(c, 5000) . "\<cr>")
endfor
let g:job = term_getjob(buf)
call Stop_shell_in_terminal(buf)
call term_wait(buf)
bwipe
endfunc

View File

@@ -769,6 +769,8 @@ static char *(features[]) =
static int included_patches[] = static int included_patches[] =
{ /* Add new patch number below this line */ { /* Add new patch number below this line */
/**/
957,
/**/ /**/
956, 956,
/**/ /**/