summaryrefslogtreecommitdiff
path: root/daemons/lvmdbusd/lvm_shell_proxy.py.in
blob: b8c8fa565b5ef375cf0f72db4107c42475dcf344 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
#!@PYTHON3@

# Copyright (C) 2015-2016 Red Hat, Inc. All rights reserved.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions
# of the GNU General Public License v.2.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Copyright 2015-2016, Vratislav Podzimek <vpodzime@redhat.com>

import subprocess
import shlex
import os
import pty
import sys
import tempfile
import time
import threading
import select

try:
	import simplejson as json
except ImportError:
	import json


import lvmdbusd.cfg as cfg
from lvmdbusd.utils import log_debug, log_error, add_no_notify, make_non_block,\
			read_decoded, extract_stack_trace, LvmBug, get_error_msg

SHELL_PROMPT = "lvm> "


def _quote_arg(arg):
	if len(shlex.split(arg)) > 1:
		return '"%s"' % arg
	else:
		return arg


class LVMShellProxy(object):

	# Read REPORT FD until we have a complete and valid JSON record or give
	# up trying to get one.
	#
	# Returns stdout, report (JSON), stderr
	def _read_response(self, no_output=False):
		stdout = ""
		report = ""
		stderr = ""
		keep_reading = True
		extra_passes = 3
		report_json = {}
		prev_report_len = 0

		# Try reading from all FDs to prevent one from filling up and causing
		# a hang.  Keep reading until we get the prompt back and the report
		# FD does not contain valid JSON

		while keep_reading and cfg.run.value != 0:
			try:
				rd_fd = [
					self.parent_stdout_fd,
					self.report_stream.fileno(),
					self.parent_stderr_fd]
				ready = select.select(rd_fd, [], [], 2)

				for r in ready[0]:
					if r == self.parent_stdout_fd:
						for line in self.parent_stdout.readlines():
							stdout += line
					elif r == self.report_stream.fileno():
						report += read_decoded(self.report_stream)
					elif r == self.parent_stderr_fd:
						for line in self.parent_stderr.readlines():
							stderr += line

				# Check to see if the lvm process died on us
				if self.lvm_shell.poll() is not None:
					raise Exception(self.lvm_shell.returncode, "%s" % stderr)

				if stdout.endswith(SHELL_PROMPT):
					if no_output:
						keep_reading = False
					else:
						cur_report_len = len(report)
						if cur_report_len != 0:
							# Only bother to parse if we have more data
							if prev_report_len != cur_report_len:
								prev_report_len = cur_report_len
								# Parse the JSON if it's good we are done,
								# if not we will try to read some more.
								try:
									report_json = json.loads(report)
									keep_reading = False
								except ValueError:
									pass

						if keep_reading:
							extra_passes -= 1
							if extra_passes <= 0:
								if len(report):
									raise LvmBug("Invalid json: %s" %
														report)
								else:
									raise LvmBug(
										"lvm returned no JSON output!")
			except Exception as e:
				log_error("While reading from lvm shell we encountered an error %s" % str(e))
				log_error("stdout= %s\nstderr= %s\n" % (stdout, stderr))
				if self.lvm_shell.poll() is not None:
					log_error("Underlying lvm shell process unexpectedly exited: %d" % self.lvm_shell.returncode)
				else:
					log_error("Underlying lvm shell process is still present!")
				raise e

		if keep_reading and cfg.run.value == 0:
			# We didn't complete as we are shutting down
			# Try to clean up lvm shell process
			log_debug("exiting lvm shell as we are shutting down")
			self.exit_shell()
			raise SystemExit

		return stdout, report_json, stderr

	def _write_cmd(self, cmd):
		self.parent_stdin.write(cmd)
		self.parent_stdin.flush()

	def __init__(self):
		# Create a temp directory
		tmp_dir = tempfile.mkdtemp(prefix="lvmdbus_")
		tmp_file = "%s/lvmdbus_report" % (tmp_dir)

		# Create a lock so that we don't step on each other when we are waiting for a command
		# to finish and some other request comes in concurrently, like to exit the shell.
		self.shell_lock = threading.RLock()

		# Create a fifo for the report output
		os.mkfifo(tmp_file, 0o600)

		# Open the fifo for use to read and for lvm child process to write to.
		self.report_fd = os.open(tmp_file, os.O_NONBLOCK)
		self.report_stream = os.fdopen(self.report_fd, 'rb', 0)
		lvm_fd = os.open(tmp_file, os.O_WRONLY)

		# Set up the environment for using our own socket for reporting and disable the abort
		# logic if lvm logs too much, which easily happens when utilizing the lvm shell.
		local_env = {"LC_ALL": "C", "LVM_REPORT_FD": "%s" % lvm_fd, "LVM_COMMAND_PROFILE": "lvmdbusd",
					 "LVM_LOG_FILE_MAX_LINES": "0"}

		# If any env variables contain LVM we will propagate them too
		for k, v in os.environ.items():
			if "LVM" in k:
				local_env[k] = v

		self.parent_stdin_fd, child_stdin_fd = pty.openpty()
		self.parent_stdout_fd, child_stdout_fd = pty.openpty()
		self.parent_stderr_fd, child_stderr_fd = pty.openpty()
		self.parent_stdin = os.fdopen(self.parent_stdin_fd, "w")
		self.parent_stdout = os.fdopen(self.parent_stdout_fd, "r")
		self.parent_stderr = os.fdopen(self.parent_stderr_fd, "r")

		# run the lvm shell
		self.lvm_shell = subprocess.Popen(
			[cfg.LVM_CMD],
			stdin=child_stdin_fd,
			stdout=child_stdout_fd, env=local_env,
			stderr=child_stderr_fd, close_fds=True,
			pass_fds=(lvm_fd,), shell=False)

		try:
			make_non_block(self.parent_stdout_fd)
			make_non_block(self.parent_stderr_fd)

			# Close our copies of the child FDs there were created with the fork, we don't need them open.
			os.close(lvm_fd)
			os.close(child_stdin_fd)
			os.close(child_stdout_fd)
			os.close(child_stderr_fd)

			# wait for the first prompt
			log_debug("waiting for first prompt...")
			errors = self._read_response(no_output=True)[2]
			if errors and len(errors):
				raise LvmBug(errors)
			log_debug("lvm prompt read!!!")
		except:
			raise
		finally:
			# These will get deleted when the FD count goes to zero, so we
			# can be sure to clean up correctly no matter how we finish
			os.unlink(tmp_file)
			os.rmdir(tmp_dir)

	def _get_last_log(self):
		# Precondition, lock is held
		self._write_cmd('lastlog\n')
		report_json = self._read_response()[1]
		return get_error_msg(report_json)

	def call_lvm(self, argv, debug=False):
		rc = 1
		error_msg = ""

		if self.lvm_shell.poll():
			raise Exception(
				self.lvm_shell.returncode,
				"Underlying lvm shell process is not present!")

		argv = add_no_notify(argv)

		# create the command string
		cmd = " ".join(_quote_arg(arg) for arg in argv)
		cmd += "\n"

		# run the command by writing it to the shell's STDIN
		with self.shell_lock:
			self._write_cmd(cmd)

			# read everything from the STDOUT to the next prompt
			stdout, report_json, stderr = self._read_response()

			# Parse the report to see what happened
			if 'log' in report_json:
				ret_code = int(report_json['log'][-1:][0]['log_ret_code'])
				# If we have an exported vg we get a log_ret_code == 5 when
				# we do a 'fullreport'
				# Note: 0 == error
				if (ret_code == 1) or (ret_code == 5 and argv[0] == 'fullreport'):
					rc = 0
				else:
					# Depending on where lvm fails the command, it may not have anything
					# to report for "lastlog", so we need to check for a message in the
					# report json too.
					error_msg = self._get_last_log()
					if error_msg is None:
						error_msg = get_error_msg(report_json)
						if error_msg is None:
							error_msg = 'No error reason provided! (missing "log" section)'

		if debug or rc != 0:
			log_error(("CMD= %s" % cmd))
			log_error(("EC= %d" % rc))
			log_error(("ERROR_MSG=\n %s\n" % error_msg))

		return rc, report_json, error_msg

	def exit_shell(self):
		with self.shell_lock:
			try:
				if self.lvm_shell is not None:
					self._write_cmd('exit\n')
					self.lvm_shell.wait(1)
					self.lvm_shell = None
			except Exception as _e:
				log_error("exit_shell: %s" % (str(_e)))

	def __del__(self):
		# Note: When we are shutting down the daemon and the main process has already exited
		# and this gets called we have a limited set of things we can do, like we cannot call
		# log_error as _common_log is None!!!
		if self.lvm_shell is not None:
			try:
				self.lvm_shell.wait(1)
			except subprocess.TimeoutExpired:
				print("lvm shell child process did not exit as instructed, sending SIGTERM")
				cfg.ignore_sigterm = True
				self.lvm_shell.terminate()
				child_exit_code = self.lvm_shell.wait(1)
				print("lvm shell process exited with %d" % child_exit_code)


if __name__ == "__main__":
	print("USING LVM BINARY: %s " % cfg.LVM_CMD)

	try:
		if len(sys.argv) > 1 and sys.argv[1] == "bisect":
			shell = LVMShellProxy()
			shell.exit_shell()
		else:
			shell = LVMShellProxy()
			in_line = "start"
			try:
				while in_line:
					in_line = input("lvm> ")
					if in_line:
						if in_line == "exit":
							shell.exit_shell()
							sys.exit(0)
						start = time.time()
						ret, out, err = shell.call_lvm(in_line.split())
						end = time.time()

						print(("RC: %d" % ret))
						print(("OUT:\n%s" % out))
						print(("ERR:\n%s" % err))

						print("Command     = %f seconds" % (end - start))
			except KeyboardInterrupt:
				pass
			except EOFError:
				pass
	except Exception as e:
		log_error("main process exiting on exception!\n%s" % extract_stack_trace(e))
		sys.exit(1)

	sys.exit(0)