forked from aniani/vim
patch 7.4.1669
Problem: When writing buffer lines to a pipe Vim may block. Solution: Avoid blocking, write more lines later.
This commit is contained in:
293
src/channel.c
293
src/channel.c
@@ -973,6 +973,7 @@ channel_set_job(channel_T *channel, job_T *job, jobopt_T *options)
|
||||
/* Special mode: send last-but-one line when appending a line
|
||||
* to the buffer. */
|
||||
in_part->ch_buffer->b_write_to_channel = TRUE;
|
||||
in_part->ch_buf_append = TRUE;
|
||||
in_part->ch_buf_top =
|
||||
in_part->ch_buffer->b_ml.ml_line_count + 1;
|
||||
}
|
||||
@@ -1047,6 +1048,8 @@ channel_set_options(channel_T *channel, jobopt_T *opt)
|
||||
channel->ch_part[PART_OUT].ch_timeout = opt->jo_out_timeout;
|
||||
if (opt->jo_set & JO_ERR_TIMEOUT)
|
||||
channel->ch_part[PART_ERR].ch_timeout = opt->jo_err_timeout;
|
||||
if (opt->jo_set & JO_BLOCK_WRITE)
|
||||
channel->ch_part[PART_IN].ch_block_write = 1;
|
||||
|
||||
if (opt->jo_set & JO_CALLBACK)
|
||||
{
|
||||
@@ -1192,10 +1195,79 @@ write_buf_line(buf_T *buf, linenr_T lnum, channel_T *channel)
|
||||
vim_free(p);
|
||||
}
|
||||
|
||||
/*
|
||||
* Return TRUE if "channel" can be written to.
|
||||
* Returns FALSE if the input is closed or the write would block.
|
||||
*/
|
||||
static int
|
||||
can_write_buf_line(channel_T *channel)
|
||||
{
|
||||
chanpart_T *in_part = &channel->ch_part[PART_IN];
|
||||
|
||||
if (in_part->ch_fd == INVALID_FD)
|
||||
return FALSE; /* pipe was closed */
|
||||
|
||||
/* for testing: block every other attempt to write */
|
||||
if (in_part->ch_block_write == 1)
|
||||
in_part->ch_block_write = -1;
|
||||
else if (in_part->ch_block_write == -1)
|
||||
in_part->ch_block_write = 1;
|
||||
|
||||
/* TODO: Win32 implementation, probably using WaitForMultipleObjects() */
|
||||
#ifndef WIN32
|
||||
{
|
||||
# if defined(HAVE_SELECT)
|
||||
struct timeval tval;
|
||||
fd_set wfds;
|
||||
int ret;
|
||||
|
||||
FD_ZERO(&wfds);
|
||||
FD_SET((int)in_part->ch_fd, &wfds);
|
||||
tval.tv_sec = 0;
|
||||
tval.tv_usec = 0;
|
||||
for (;;)
|
||||
{
|
||||
ret = select((int)in_part->ch_fd + 1, NULL, &wfds, NULL, &tval);
|
||||
# ifdef EINTR
|
||||
SOCK_ERRNO;
|
||||
if (ret == -1 && errno == EINTR)
|
||||
continue;
|
||||
# endif
|
||||
if (ret <= 0 || in_part->ch_block_write == 1)
|
||||
{
|
||||
if (ret > 0)
|
||||
ch_log(channel, "FAKED Input not ready for writing");
|
||||
else
|
||||
ch_log(channel, "Input not ready for writing");
|
||||
return FALSE;
|
||||
}
|
||||
break;
|
||||
}
|
||||
# else
|
||||
struct pollfd fds;
|
||||
|
||||
fds.fd = in_part->ch_fd;
|
||||
fds.events = POLLOUT;
|
||||
if (poll(&fds, 1, 0) <= 0)
|
||||
{
|
||||
ch_log(channel, "Input not ready for writing");
|
||||
return FALSE;
|
||||
}
|
||||
if (in_part->ch_block_write == 1)
|
||||
{
|
||||
ch_log(channel, "FAKED Input not ready for writing");
|
||||
return FALSE;
|
||||
}
|
||||
# endif
|
||||
}
|
||||
#endif
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
/*
|
||||
* Write any lines to the input channel.
|
||||
*/
|
||||
void
|
||||
static void
|
||||
channel_write_in(channel_T *channel)
|
||||
{
|
||||
chanpart_T *in_part = &channel->ch_part[PART_IN];
|
||||
@@ -1203,8 +1275,8 @@ channel_write_in(channel_T *channel)
|
||||
buf_T *buf = in_part->ch_buffer;
|
||||
int written = 0;
|
||||
|
||||
if (buf == NULL)
|
||||
return;
|
||||
if (buf == NULL || in_part->ch_buf_append)
|
||||
return; /* no buffer or using appending */
|
||||
if (!buf_valid(buf) || buf->b_ml.ml_mfp == NULL)
|
||||
{
|
||||
/* buffer was wiped out or unloaded */
|
||||
@@ -1215,10 +1287,8 @@ channel_write_in(channel_T *channel)
|
||||
for (lnum = in_part->ch_buf_top; lnum <= in_part->ch_buf_bot
|
||||
&& lnum <= buf->b_ml.ml_line_count; ++lnum)
|
||||
{
|
||||
if (in_part->ch_fd == INVALID_FD)
|
||||
/* pipe was closed */
|
||||
return;
|
||||
/* TODO: check if channel can be written to, do not block on write */
|
||||
if (!can_write_buf_line(channel))
|
||||
break;
|
||||
write_buf_line(buf, lnum, channel);
|
||||
++written;
|
||||
}
|
||||
@@ -1229,6 +1299,37 @@ channel_write_in(channel_T *channel)
|
||||
ch_logn(channel, "written %d lines to channel", written);
|
||||
|
||||
in_part->ch_buf_top = lnum;
|
||||
if (lnum > buf->b_ml.ml_line_count)
|
||||
{
|
||||
/* Writing is done, no longer need the buffer. */
|
||||
in_part->ch_buffer = NULL;
|
||||
ch_log(channel, "Finished writing all lines to channel");
|
||||
}
|
||||
else
|
||||
ch_logn(channel, "Still %d more lines to write",
|
||||
buf->b_ml.ml_line_count - lnum + 1);
|
||||
}
|
||||
|
||||
/*
|
||||
* Write any lines waiting to be written to a channel.
|
||||
*/
|
||||
void
|
||||
channel_write_any_lines()
|
||||
{
|
||||
channel_T *channel;
|
||||
|
||||
for (channel = first_channel; channel != NULL; channel = channel->ch_next)
|
||||
{
|
||||
chanpart_T *in_part = &channel->ch_part[PART_IN];
|
||||
|
||||
if (in_part->ch_buffer != NULL)
|
||||
{
|
||||
if (in_part->ch_buf_append)
|
||||
channel_write_new_lines(in_part->ch_buffer);
|
||||
else
|
||||
channel_write_in(channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1248,15 +1349,16 @@ channel_write_new_lines(buf_T *buf)
|
||||
linenr_T lnum;
|
||||
int written = 0;
|
||||
|
||||
if (in_part->ch_buffer == buf)
|
||||
if (in_part->ch_buffer == buf && in_part->ch_buf_append)
|
||||
{
|
||||
if (in_part->ch_fd == INVALID_FD)
|
||||
/* pipe was closed */
|
||||
continue;
|
||||
continue; /* pipe was closed */
|
||||
found_one = TRUE;
|
||||
for (lnum = in_part->ch_buf_bot; lnum < buf->b_ml.ml_line_count;
|
||||
++lnum)
|
||||
{
|
||||
if (!can_write_buf_line(channel))
|
||||
break;
|
||||
write_buf_line(buf, lnum, channel);
|
||||
++written;
|
||||
}
|
||||
@@ -1265,6 +1367,9 @@ channel_write_new_lines(buf_T *buf)
|
||||
ch_logn(channel, "written line %d to channel", (int)lnum - 1);
|
||||
else if (written > 1)
|
||||
ch_logn(channel, "written %d lines to channel", written);
|
||||
if (lnum < buf->b_ml.ml_line_count)
|
||||
ch_logn(channel, "Still %d more lines to write",
|
||||
buf->b_ml.ml_line_count - lnum);
|
||||
|
||||
in_part->ch_buf_bot = lnum;
|
||||
}
|
||||
@@ -2379,6 +2484,57 @@ channel_free_all(void)
|
||||
/* Buffer size for reading incoming messages. */
|
||||
#define MAXMSGSIZE 4096
|
||||
|
||||
#if defined(HAVE_SELECT)
|
||||
/*
|
||||
* Add write fds where we are waiting for writing to be possible.
|
||||
*/
|
||||
static int
|
||||
channel_fill_wfds(int maxfd_arg, fd_set *wfds)
|
||||
{
|
||||
int maxfd = maxfd_arg;
|
||||
channel_T *ch;
|
||||
|
||||
for (ch = first_channel; ch != NULL; ch = ch->ch_next)
|
||||
{
|
||||
chanpart_T *in_part = &ch->ch_part[PART_IN];
|
||||
|
||||
if (in_part->ch_fd != INVALID_FD && in_part->ch_buffer != NULL)
|
||||
{
|
||||
FD_SET((int)in_part->ch_fd, wfds);
|
||||
if ((int)in_part->ch_fd >= maxfd)
|
||||
maxfd = (int)in_part->ch_fd + 1;
|
||||
}
|
||||
}
|
||||
return maxfd;
|
||||
}
|
||||
#else
|
||||
/*
|
||||
* Add write fds where we are waiting for writing to be possible.
|
||||
*/
|
||||
static int
|
||||
channel_fill_poll_write(int nfd_in, struct pollfd *fds)
|
||||
{
|
||||
int nfd = nfd_in;
|
||||
channel_T *ch;
|
||||
|
||||
for (ch = first_channel; ch != NULL; ch = ch->ch_next)
|
||||
{
|
||||
chanpart_T *in_part = &ch->ch_part[PART_IN];
|
||||
|
||||
if (in_part->ch_fd != INVALID_FD && in_part->ch_buffer != NULL)
|
||||
{
|
||||
in_part->ch_poll_idx = nfd;
|
||||
fds[nfd].fd = in_part->ch_fd;
|
||||
fds[nfd].events = POLLOUT;
|
||||
++nfd;
|
||||
}
|
||||
else
|
||||
in_part->ch_poll_idx = -1;
|
||||
}
|
||||
return nfd;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Check for reading from "fd" with "timeout" msec.
|
||||
* Return FAIL when there is nothing to read.
|
||||
@@ -2403,6 +2559,10 @@ channel_wait(channel_T *channel, sock_T fd, int timeout)
|
||||
if (PeekNamedPipe((HANDLE)fd, NULL, 0, NULL, &nread, NULL)
|
||||
&& nread > 0)
|
||||
return OK;
|
||||
|
||||
/* perhaps write some buffer lines */
|
||||
channel_write_any_lines();
|
||||
|
||||
sleep_time = deadline - GetTickCount();
|
||||
if (sleep_time <= 0)
|
||||
break;
|
||||
@@ -2422,31 +2582,56 @@ channel_wait(channel_T *channel, sock_T fd, int timeout)
|
||||
#if defined(HAVE_SELECT)
|
||||
struct timeval tval;
|
||||
fd_set rfds;
|
||||
int ret;
|
||||
fd_set wfds;
|
||||
int ret;
|
||||
int maxfd;
|
||||
|
||||
FD_ZERO(&rfds);
|
||||
FD_SET((int)fd, &rfds);
|
||||
tval.tv_sec = timeout / 1000;
|
||||
tval.tv_usec = (timeout % 1000) * 1000;
|
||||
for (;;)
|
||||
{
|
||||
ret = select((int)fd + 1, &rfds, NULL, NULL, &tval);
|
||||
FD_ZERO(&rfds);
|
||||
FD_SET((int)fd, &rfds);
|
||||
|
||||
/* Write lines to a pipe when a pipe can be written to. Need to
|
||||
* set this every time, some buffers may be done. */
|
||||
maxfd = (int)fd + 1;
|
||||
FD_ZERO(&wfds);
|
||||
maxfd = channel_fill_wfds(maxfd, &wfds);
|
||||
|
||||
ret = select(maxfd, &rfds, &wfds, NULL, &tval);
|
||||
# ifdef EINTR
|
||||
SOCK_ERRNO;
|
||||
if (ret == -1 && errno == EINTR)
|
||||
continue;
|
||||
# endif
|
||||
if (ret > 0)
|
||||
return OK;
|
||||
{
|
||||
if (FD_ISSET(fd, &rfds))
|
||||
return OK;
|
||||
channel_write_any_lines();
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
#else
|
||||
struct pollfd fds;
|
||||
for (;;)
|
||||
{
|
||||
struct pollfd fds[MAX_OPEN_CHANNELS + 1];
|
||||
int nfd = 1;
|
||||
|
||||
fds.fd = fd;
|
||||
fds.events = POLLIN;
|
||||
if (poll(&fds, 1, timeout) > 0)
|
||||
return OK;
|
||||
fds[0].fd = fd;
|
||||
fds[0].events = POLLIN;
|
||||
nfd = channel_fill_poll_write(nfd, fds);
|
||||
if (poll(fds, nfd, timeout) > 0)
|
||||
{
|
||||
if (fds[0].revents & POLLIN)
|
||||
return OK;
|
||||
channel_write_any_lines();
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
return FAIL;
|
||||
@@ -3010,10 +3195,12 @@ channel_poll_setup(int nfd_in, void *fds_in)
|
||||
{
|
||||
for (part = PART_SOCK; part < PART_IN; ++part)
|
||||
{
|
||||
if (channel->ch_part[part].ch_fd != INVALID_FD)
|
||||
chanpart_T *ch_part = &channel->ch_part[part];
|
||||
|
||||
if (ch_part->ch_fd != INVALID_FD)
|
||||
{
|
||||
channel->ch_part[part].ch_poll_idx = nfd;
|
||||
fds[nfd].fd = channel->ch_part[part].ch_fd;
|
||||
ch_part->ch_poll_idx = nfd;
|
||||
fds[nfd].fd = ch_part->ch_fd;
|
||||
fds[nfd].events = POLLIN;
|
||||
nfd++;
|
||||
}
|
||||
@@ -3022,6 +3209,8 @@ channel_poll_setup(int nfd_in, void *fds_in)
|
||||
}
|
||||
}
|
||||
|
||||
nfd = channel_fill_poll_write(nfd, fds);
|
||||
|
||||
return nfd;
|
||||
}
|
||||
|
||||
@@ -3035,19 +3224,35 @@ channel_poll_check(int ret_in, void *fds_in)
|
||||
channel_T *channel;
|
||||
struct pollfd *fds = fds_in;
|
||||
int part;
|
||||
int idx;
|
||||
chanpart_T *in_part;
|
||||
|
||||
for (channel = first_channel; channel != NULL; channel = channel->ch_next)
|
||||
{
|
||||
for (part = PART_SOCK; part < PART_IN; ++part)
|
||||
{
|
||||
int idx = channel->ch_part[part].ch_poll_idx;
|
||||
idx = channel->ch_part[part].ch_poll_idx;
|
||||
|
||||
if (ret > 0 && idx != -1 && fds[idx].revents & POLLIN)
|
||||
if (ret > 0 && idx != -1 && (fds[idx].revents & POLLIN))
|
||||
{
|
||||
channel_read(channel, part, "channel_poll_check");
|
||||
--ret;
|
||||
}
|
||||
}
|
||||
|
||||
in_part = &channel->ch_part[PART_IN];
|
||||
idx = in_part->ch_poll_idx;
|
||||
if (ret > 0 && idx != -1 && (fds[idx].revents & POLLOUT))
|
||||
{
|
||||
if (in_part->ch_buf_append)
|
||||
{
|
||||
if (in_part->ch_buffer != NULL)
|
||||
channel_write_new_lines(in_part->ch_buffer);
|
||||
}
|
||||
else
|
||||
channel_write_in(channel);
|
||||
--ret;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
@@ -3056,14 +3261,15 @@ channel_poll_check(int ret_in, void *fds_in)
|
||||
|
||||
# if (!defined(WIN32) && defined(HAVE_SELECT)) || defined(PROTO)
|
||||
/*
|
||||
* The type of "rfds" is hidden to avoid problems with the function proto.
|
||||
* The "fd_set" type is hidden to avoid problems with the function proto.
|
||||
*/
|
||||
int
|
||||
channel_select_setup(int maxfd_in, void *rfds_in)
|
||||
channel_select_setup(int maxfd_in, void *rfds_in, void *wfds_in)
|
||||
{
|
||||
int maxfd = maxfd_in;
|
||||
channel_T *channel;
|
||||
fd_set *rfds = rfds_in;
|
||||
fd_set *wfds = wfds_in;
|
||||
int part;
|
||||
|
||||
for (channel = first_channel; channel != NULL; channel = channel->ch_next)
|
||||
@@ -3081,19 +3287,23 @@ channel_select_setup(int maxfd_in, void *rfds_in)
|
||||
}
|
||||
}
|
||||
|
||||
maxfd = channel_fill_wfds(maxfd, wfds);
|
||||
|
||||
return maxfd;
|
||||
}
|
||||
|
||||
/*
|
||||
* The type of "rfds" is hidden to avoid problems with the function proto.
|
||||
* The "fd_set" type is hidden to avoid problems with the function proto.
|
||||
*/
|
||||
int
|
||||
channel_select_check(int ret_in, void *rfds_in)
|
||||
channel_select_check(int ret_in, void *rfds_in, void *wfds_in)
|
||||
{
|
||||
int ret = ret_in;
|
||||
channel_T *channel;
|
||||
fd_set *rfds = rfds_in;
|
||||
fd_set *wfds = wfds_in;
|
||||
int part;
|
||||
chanpart_T *in_part;
|
||||
|
||||
for (channel = first_channel; channel != NULL; channel = channel->ch_next)
|
||||
{
|
||||
@@ -3107,6 +3317,20 @@ channel_select_check(int ret_in, void *rfds_in)
|
||||
--ret;
|
||||
}
|
||||
}
|
||||
|
||||
in_part = &channel->ch_part[PART_IN];
|
||||
if (ret > 0 && in_part->ch_fd != INVALID_FD
|
||||
&& FD_ISSET(in_part->ch_fd, wfds))
|
||||
{
|
||||
if (in_part->ch_buf_append)
|
||||
{
|
||||
if (in_part->ch_buffer != NULL)
|
||||
channel_write_new_lines(in_part->ch_buffer);
|
||||
}
|
||||
else
|
||||
channel_write_in(channel);
|
||||
--ret;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
@@ -3608,6 +3832,13 @@ get_job_options(typval_T *tv, jobopt_T *opt, int supported)
|
||||
return FAIL;
|
||||
}
|
||||
}
|
||||
else if (STRCMP(hi->hi_key, "block_write") == 0)
|
||||
{
|
||||
if (!(supported & JO_BLOCK_WRITE))
|
||||
break;
|
||||
opt->jo_set |= JO_BLOCK_WRITE;
|
||||
opt->jo_block_write = get_tv_number(item);
|
||||
}
|
||||
else
|
||||
break;
|
||||
--todo;
|
||||
@@ -3827,8 +4058,8 @@ job_start(typval_T *argvars)
|
||||
clear_job_options(&opt);
|
||||
opt.jo_mode = MODE_NL;
|
||||
if (get_job_options(&argvars[1], &opt,
|
||||
JO_MODE_ALL + JO_CB_ALL + JO_TIMEOUT_ALL
|
||||
+ JO_STOPONEXIT + JO_EXIT_CB + JO_OUT_IO) == FAIL)
|
||||
JO_MODE_ALL + JO_CB_ALL + JO_TIMEOUT_ALL + JO_STOPONEXIT
|
||||
+ JO_EXIT_CB + JO_OUT_IO + JO_BLOCK_WRITE) == FAIL)
|
||||
return job;
|
||||
|
||||
/* Check that when io is "file" that there is a file name. */
|
||||
|
Reference in New Issue
Block a user