mirror of
https://github.com/vim/vim.git
synced 2025-07-26 11:04:33 -04:00
270 lines
12 KiB
Python
270 lines
12 KiB
Python
#!/usr/bin/python
|
|
#
|
|
# Server that will accept connections from a Vim channel.
|
|
# Used by test_channel.vim.
|
|
#
|
|
# This requires Python 2.6 or later.
|
|
|
|
from __future__ import print_function
|
|
import json
|
|
import socket
|
|
import sys
|
|
import time
|
|
import threading
|
|
|
|
try:
|
|
# Python 3
|
|
import socketserver
|
|
except ImportError:
|
|
# Python 2
|
|
import SocketServer as socketserver
|
|
|
|
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
|
|
|
|
def handle(self):
|
|
print("=== socket opened ===")
|
|
while True:
|
|
try:
|
|
received = self.request.recv(4096).decode('utf-8')
|
|
except socket.error:
|
|
print("=== socket error ===")
|
|
break
|
|
except IOError:
|
|
print("=== socket closed ===")
|
|
break
|
|
if received == '':
|
|
print("=== socket closed ===")
|
|
break
|
|
print("received: {0}".format(received))
|
|
|
|
# We may receive two messages at once. Take the part up to the
|
|
# newline, which should be after the matching "]".
|
|
todo = received
|
|
while todo != '':
|
|
splitidx = todo.find('\n')
|
|
if splitidx < 0:
|
|
used = todo
|
|
todo = ''
|
|
else:
|
|
used = todo[:splitidx]
|
|
todo = todo[splitidx + 1:]
|
|
if used != received:
|
|
print("using: {0}".format(used))
|
|
|
|
try:
|
|
decoded = json.loads(used)
|
|
except ValueError:
|
|
print("json decoding failed")
|
|
decoded = [-1, '']
|
|
|
|
# Send a response if the sequence number is positive.
|
|
if decoded[0] >= 0:
|
|
if decoded[1] == 'hello!':
|
|
# simply send back a string
|
|
response = "got it"
|
|
elif decoded[1].startswith("echo "):
|
|
# send back the argument
|
|
response = decoded[1][5:]
|
|
elif decoded[1] == 'make change':
|
|
# Send two ex commands at the same time, before
|
|
# replying to the request.
|
|
cmd = '["ex","call append(\\"$\\",\\"added1\\")"]'
|
|
cmd += '["ex","call append(\\"$\\",\\"added2\\")"]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'bad command':
|
|
cmd = '["ex","foo bar"]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'do normal':
|
|
# Send a normal command.
|
|
cmd = '["normal","G$s more\u001b"]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'eval-works':
|
|
# Send an eval request. We ignore the response.
|
|
cmd = '["expr","\\"foo\\" . 123", -1]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'eval-special':
|
|
# Send an eval request. We ignore the response.
|
|
cmd = '["expr","\\"foo\x7f\x10\x01bar\\"", -2]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'eval-getline':
|
|
# Send an eval request. We ignore the response.
|
|
cmd = '["expr","getline(3)", -3]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'eval-fails':
|
|
# Send an eval request that will fail.
|
|
cmd = '["expr","xxx", -4]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'eval-error':
|
|
# Send an eval request that works but the result can't
|
|
# be encoded.
|
|
cmd = '["expr","function(\\"tr\\")", -5]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'eval-bad':
|
|
# Send an eval request missing the third argument.
|
|
cmd = '["expr","xxx"]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'malformed1':
|
|
cmd = '["ex",":"]wrong!["ex","smi"]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
# Need to wait for Vim to give up, otherwise it
|
|
# sometimes fails on OS X.
|
|
time.sleep(0.2)
|
|
elif decoded[1] == 'malformed2':
|
|
cmd = '"unterminated string'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
# Need to wait for Vim to give up, otherwise the double
|
|
# quote in the "ok" response terminates the string.
|
|
time.sleep(0.2)
|
|
elif decoded[1] == 'malformed3':
|
|
cmd = '["ex","missing ]"'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
# Need to wait for Vim to give up, otherwise the ]
|
|
# in the "ok" response terminates the list.
|
|
time.sleep(0.2)
|
|
elif decoded[1] == 'split':
|
|
cmd = '["ex","let '
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
time.sleep(0.01)
|
|
cmd = 'g:split = 123"]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'an expr':
|
|
# Send an expr request.
|
|
cmd = '["expr","setline(\\"$\\", [\\"one\\",\\"two\\",\\"three\\"])"]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'call-func':
|
|
cmd = '["call","MyFunction",[1,2,3], 0]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'redraw':
|
|
cmd = '["redraw",""]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'redraw!':
|
|
cmd = '["redraw","force"]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'empty-request':
|
|
cmd = '[]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'eval-result':
|
|
# Send back the last received eval result.
|
|
response = last_eval
|
|
elif decoded[1] == 'call me':
|
|
cmd = '[0,"we called you"]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "ok"
|
|
elif decoded[1] == 'call me again':
|
|
cmd = '[0,"we did call you"]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = ""
|
|
elif decoded[1] == 'send zero':
|
|
cmd = '[0,"zero index"]'
|
|
print("sending: {0}".format(cmd))
|
|
self.request.sendall(cmd.encode('utf-8'))
|
|
response = "sent zero"
|
|
elif decoded[1] == 'close me':
|
|
print("closing")
|
|
self.request.close()
|
|
response = ""
|
|
elif decoded[1] == 'wait a bit':
|
|
time.sleep(0.2)
|
|
response = "waited"
|
|
elif decoded[1] == '!quit!':
|
|
# we're done
|
|
self.server.shutdown()
|
|
return
|
|
elif decoded[1] == '!crash!':
|
|
# Crash!
|
|
42 / 0
|
|
else:
|
|
response = "what?"
|
|
|
|
if response == "":
|
|
print("no response")
|
|
else:
|
|
encoded = json.dumps([decoded[0], response])
|
|
print("sending: {0}".format(encoded))
|
|
self.request.sendall(encoded.encode('utf-8'))
|
|
|
|
# Negative numbers are used for "eval" responses.
|
|
elif decoded[0] < 0:
|
|
last_eval = decoded
|
|
|
|
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
|
pass
|
|
|
|
def writePortInFile(port):
|
|
# Write the port number in Xportnr, so that the test knows it.
|
|
f = open("Xportnr", "w")
|
|
f.write("{0}".format(port))
|
|
f.close()
|
|
|
|
if __name__ == "__main__":
|
|
HOST, PORT = "localhost", 0
|
|
|
|
# Wait half a second before opening the port to test waittime in ch_open().
|
|
# We do want to get the port number, get that first. We cannot open the
|
|
# socket, guess a port is free.
|
|
if len(sys.argv) >= 2 and sys.argv[1] == 'delay':
|
|
PORT = 13684
|
|
writePortInFile(PORT)
|
|
|
|
print("Wait for it...")
|
|
time.sleep(0.5)
|
|
|
|
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
|
|
ip, port = server.server_address
|
|
|
|
# Start a thread with the server. That thread will then start a new thread
|
|
# for each connection.
|
|
server_thread = threading.Thread(target=server.serve_forever)
|
|
server_thread.start()
|
|
|
|
writePortInFile(port)
|
|
|
|
print("Listening on port {0}".format(port))
|
|
|
|
# Main thread terminates, but the server continues running
|
|
# until server.shutdown() is called.
|
|
try:
|
|
while server_thread.isAlive():
|
|
server_thread.join(1)
|
|
except (KeyboardInterrupt, SystemExit):
|
|
server.shutdown()
|