Coverage for cogapp/cogapp.py: 49.01%

500 statements  

« prev     ^ index     » next       coverage.py v7.2.5, created at 2023-04-30 06:39 -0400

1""" Cog content generation tool. 

2""" 

3 

4import copy 

5import getopt 

6import glob 

7import hashlib 

8import io 

9import linecache 

10import os 

11import re 

12import shlex 

13import sys 

14import traceback 

15import types 

16 

17from .whiteutils import commonPrefix, reindentBlock, whitePrefix 

18 

19__version__ = "4.0.0.dev2" 

20 

21usage = """\ 

22cog - generate content with inlined Python code. 

23 

24cog [OPTIONS] [INFILE | @FILELIST] ... 

25 

26INFILE is the name of an input file, '-' will read from stdin. 

27FILELIST is the name of a text file containing file names or 

28other @FILELISTs. 

29 

30OPTIONS: 

31 -c Checksum the output to protect it against accidental change. 

32 -d Delete the generator code from the output file. 

33 -D name=val Define a global string available to your generator code. 

34 -e Warn if a file has no cog code in it. 

35 -I PATH Add PATH to the list of directories for data files and modules. 

36 -n ENCODING Use ENCODING when reading and writing files. 

37 -o OUTNAME Write the output to OUTNAME. 

38 -p PROLOGUE Prepend the generator source with PROLOGUE. Useful to insert an 

39 import line. Example: -p "import math" 

40 -P Use print() instead of cog.outl() for code output. 

41 -r Replace the input file with the output. 

42 -s STRING Suffix all generated output lines with STRING. 

43 -U Write the output with Unix newlines (only LF line-endings). 

44 -w CMD Use CMD if the output file needs to be made writable. 

45 A %s in the CMD will be filled with the filename. 

46 -x Excise all the generated output without running the generators. 

47 -z The end-output marker can be omitted, and is assumed at eof. 

48 -v Print the version of cog and exit. 

49 --check Check that the files would not change if run again. 

50 --markers='START END END-OUTPUT' 

51 The patterns surrounding cog inline instructions. Should 

52 include three values separated by spaces, the start, end, 

53 and end-output markers. Defaults to '[[[cog ]]] [[[end]]]'. 

54 --verbosity=VERBOSITY 

55 Control the amount of output. 2 (the default) lists all files, 

56 1 lists only changed files, 0 lists no files. 

57 -h Print this help. 

58""" 

59 

60class CogError(Exception): 

61 """ Any exception raised by Cog. 

62 """ 

63 def __init__(self, msg, file='', line=0): 

64 if file: 

65 super().__init__(f"{file}({line}): {msg}") 

66 else: 

67 super().__init__(msg) 

68 

69class CogUsageError(CogError): 

70 """ An error in usage of command-line arguments in cog. 

71 """ 

72 pass 

73 

74class CogInternalError(CogError): 

75 """ An error in the coding of Cog. Should never happen. 

76 """ 

77 pass 

78 

79class CogGeneratedError(CogError): 

80 """ An error raised by a user's cog generator. 

81 """ 

82 pass 

83 

84class CogUserException(CogError): 

85 """ An exception caught when running a user's cog generator. 

86 The argument is the traceback message to print. 

87 """ 

88 pass 

89 

90class CogCheckFailed(CogError): 

91 """ A --check failed. 

92 """ 

93 pass 

94 

95class Redirectable: 

96 """ An object with its own stdout and stderr files. 

97 """ 

98 def __init__(self): 

99 self.stdout = sys.stdout 

100 self.stderr = sys.stderr 

101 

102 def setOutput(self, stdout=None, stderr=None): 

103 """ Assign new files for standard out and/or standard error. 

104 """ 

105 if stdout: 105 ↛ 107line 105 didn't jump to line 107, because the condition on line 105 was never false

106 self.stdout = stdout 

107 if stderr: 107 ↛ 108line 107 didn't jump to line 108, because the condition on line 107 was never true

108 self.stderr = stderr 

109 

110 def prout(self, s, end="\n"): 

111 print(s, file=self.stdout, end=end) 

112 

113 def prerr(self, s, end="\n"): 

114 print(s, file=self.stderr, end=end) 

115 

116 

117class CogGenerator(Redirectable): 

118 """ A generator pulled from a source file. 

119 """ 

120 def __init__(self, options=None): 

121 super().__init__() 

122 self.markers = [] 

123 self.lines = [] 

124 self.options = options or CogOptions() 

125 

126 def parseMarker(self, l): 

127 self.markers.append(l) 

128 

129 def parseLine(self, l): 

130 self.lines.append(l.strip('\n')) 

131 

132 def getCode(self): 

133 """ Extract the executable Python code from the generator. 

134 """ 

135 # If the markers and lines all have the same prefix 

136 # (end-of-line comment chars, for example), 

137 # then remove it from all the lines. 

138 prefIn = commonPrefix(self.markers + self.lines) 

139 if prefIn: 

140 self.markers = [ l.replace(prefIn, '', 1) for l in self.markers ] 

141 self.lines = [ l.replace(prefIn, '', 1) for l in self.lines ] 

142 

143 return reindentBlock(self.lines, '') 

144 

145 def evaluate(self, cog, globals, fname): 

146 # figure out the right whitespace prefix for the output 

147 prefOut = whitePrefix(self.markers) 

148 

149 intext = self.getCode() 

150 if not intext: 

151 return '' 

152 

153 prologue = "import " + cog.cogmodulename + " as cog\n" 

154 if self.options.sPrologue: 154 ↛ 155line 154 didn't jump to line 155, because the condition on line 154 was never true

155 prologue += self.options.sPrologue + '\n' 

156 code = compile(prologue + intext, str(fname), 'exec') 

157 

158 # Make sure the "cog" module has our state. 

159 cog.cogmodule.msg = self.msg 

160 cog.cogmodule.out = self.out 

161 cog.cogmodule.outl = self.outl 

162 cog.cogmodule.error = self.error 

163 

164 real_stdout = sys.stdout 

165 if self.options.bPrintOutput: 165 ↛ 166line 165 didn't jump to line 166, because the condition on line 165 was never true

166 sys.stdout = captured_stdout = io.StringIO() 

167 

168 self.outstring = '' 

169 try: 

170 eval(code, globals) 

171 except CogError: 171 ↛ 172line 171 didn't jump to line 172, because the exception caught by line 171 didn't happen

172 raise 

173 except: 

174 typ, err, tb = sys.exc_info() 

175 frames = (tuple(fr) for fr in traceback.extract_tb(tb.tb_next)) 

176 frames = find_cog_source(frames, prologue) 

177 msg = "".join(traceback.format_list(frames)) 

178 msg += f"{typ.__name__}: {err}" 

179 raise CogUserException(msg) 

180 finally: 

181 sys.stdout = real_stdout 

182 

183 if self.options.bPrintOutput: 183 ↛ 184line 183 didn't jump to line 184, because the condition on line 183 was never true

184 self.outstring = captured_stdout.getvalue() 

185 

186 # We need to make sure that the last line in the output 

187 # ends with a newline, or it will be joined to the 

188 # end-output line, ruining cog's idempotency. 

189 if self.outstring and self.outstring[-1] != '\n': 

190 self.outstring += '\n' 

191 

192 return reindentBlock(self.outstring, prefOut) 

193 

194 def msg(self, s): 

195 self.prout("Message: "+s) 

196 

197 def out(self, sOut='', dedent=False, trimblanklines=False): 

198 """ The cog.out function. 

199 """ 

200 if trimblanklines and ('\n' in sOut): 

201 lines = sOut.split('\n') 

202 if lines[0].strip() == '': 

203 del lines[0] 

204 if lines and lines[-1].strip() == '': 

205 del lines[-1] 

206 sOut = '\n'.join(lines)+'\n' 

207 if dedent: 

208 sOut = reindentBlock(sOut) 

209 self.outstring += sOut 

210 

211 def outl(self, sOut='', **kw): 

212 """ The cog.outl function. 

213 """ 

214 self.out(sOut, **kw) 

215 self.out('\n') 

216 

217 def error(self, msg='Error raised by cog generator.'): 

218 """ The cog.error function. 

219 Instead of raising standard python errors, cog generators can use 

220 this function. It will display the error without a scary Python 

221 traceback. 

222 """ 

223 raise CogGeneratedError(msg) 

224 

225 

226class NumberedFileReader: 

227 """ A decorator for files that counts the readline()'s called. 

228 """ 

229 def __init__(self, f): 

230 self.f = f 

231 self.n = 0 

232 

233 def readline(self): 

234 l = self.f.readline() 

235 if l: 

236 self.n += 1 

237 return l 

238 

239 def linenumber(self): 

240 return self.n 

241 

242 

243class CogOptions: 

244 """ Options for a run of cog. 

245 """ 

246 def __init__(self): 

247 # Defaults for argument values. 

248 self.args = [] 

249 self.includePath = [] 

250 self.defines = {} 

251 self.bShowVersion = False 

252 self.sMakeWritableCmd = None 

253 self.bReplace = False 

254 self.bNoGenerate = False 

255 self.sOutputName = None 

256 self.bWarnEmpty = False 

257 self.bHashOutput = False 

258 self.bDeleteCode = False 

259 self.bEofCanBeEnd = False 

260 self.sSuffix = None 

261 self.bNewlines = False 

262 self.sBeginSpec = '[[[cog' 

263 self.sEndSpec = ']]]' 

264 self.sEndOutput = '[[[end]]]' 

265 self.sEncoding = "utf-8" 

266 self.verbosity = 2 

267 self.sPrologue = '' 

268 self.bPrintOutput = False 

269 self.bCheck = False 

270 

271 def __eq__(self, other): 

272 """ Comparison operator for tests to use. 

273 """ 

274 return self.__dict__ == other.__dict__ 

275 

276 def clone(self): 

277 """ Make a clone of these options, for further refinement. 

278 """ 

279 return copy.deepcopy(self) 

280 

281 def addToIncludePath(self, dirs): 

282 """ Add directories to the include path. 

283 """ 

284 dirs = dirs.split(os.pathsep) 

285 self.includePath.extend(dirs) 

286 

287 def parseArgs(self, argv): 

288 # Parse the command line arguments. 

289 try: 

290 opts, self.args = getopt.getopt( 

291 argv, 

292 'cdD:eI:n:o:rs:p:PUvw:xz', 

293 [ 

294 'check', 

295 'markers=', 

296 'verbosity=', 

297 ] 

298 ) 

299 except getopt.error as msg: 

300 raise CogUsageError(msg) 

301 

302 # Handle the command line arguments. 

303 for o, a in opts: 

304 if o == '-c': 

305 self.bHashOutput = True 

306 elif o == '-d': 

307 self.bDeleteCode = True 

308 elif o == '-D': 

309 if a.count('=') < 1: 

310 raise CogUsageError("-D takes a name=value argument") 

311 name, value = a.split('=', 1) 

312 self.defines[name] = value 

313 elif o == '-e': 

314 self.bWarnEmpty = True 

315 elif o == '-I': 

316 self.addToIncludePath(os.path.abspath(a)) 

317 elif o == '-n': 

318 self.sEncoding = a 

319 elif o == '-o': 

320 self.sOutputName = a 

321 elif o == '-r': 

322 self.bReplace = True 

323 elif o == '-s': 

324 self.sSuffix = a 

325 elif o == '-p': 

326 self.sPrologue = a 

327 elif o == '-P': 

328 self.bPrintOutput = True 

329 elif o == '-U': 

330 self.bNewlines = True 

331 elif o == '-v': 

332 self.bShowVersion = True 

333 elif o == '-w': 

334 self.sMakeWritableCmd = a 

335 elif o == '-x': 

336 self.bNoGenerate = True 

337 elif o == '-z': 

338 self.bEofCanBeEnd = True 

339 elif o == '--check': 

340 self.bCheck = True 

341 elif o == '--markers': 

342 self._parse_markers(a) 

343 elif o == '--verbosity': 

344 self.verbosity = int(a) 

345 else: 

346 # Since getopt.getopt is given a list of possible flags, 

347 # this is an internal error. 

348 raise CogInternalError(f"Don't understand argument {o}") 

349 

350 def _parse_markers(self, val): 

351 try: 

352 self.sBeginSpec, self.sEndSpec, self.sEndOutput = val.split(" ") 

353 except ValueError: 

354 raise CogUsageError( 

355 f"--markers requires 3 values separated by spaces, could not parse {val!r}" 

356 ) 

357 

358 def validate(self): 

359 """ Does nothing if everything is OK, raises CogError's if it's not. 

360 """ 

361 if self.bReplace and self.bDeleteCode: 

362 raise CogUsageError("Can't use -d with -r (or you would delete all your source!)") 

363 

364 if self.bReplace and self.sOutputName: 

365 raise CogUsageError("Can't use -o with -r (they are opposites)") 

366 

367 

368class Cog(Redirectable): 

369 """ The Cog engine. 

370 """ 

371 def __init__(self): 

372 super().__init__() 

373 self.options = CogOptions() 

374 self._fixEndOutputPatterns() 

375 self.cogmodulename = "cog" 

376 self.createCogModule() 

377 self.bCheckFailed = False 

378 

379 def _fixEndOutputPatterns(self): 

380 end_output = re.escape(self.options.sEndOutput) 

381 self.reEndOutput = re.compile(end_output + r"(?P<hashsect> *\(checksum: (?P<hash>[a-f0-9]+)\))") 

382 self.sEndFormat = self.options.sEndOutput + " (checksum: %s)" 

383 

384 def showWarning(self, msg): 

385 self.prout(f"Warning: {msg}") 

386 

387 def isBeginSpecLine(self, s): 

388 return self.options.sBeginSpec in s 

389 

390 def isEndSpecLine(self, s): 

391 return self.options.sEndSpec in s and not self.isEndOutputLine(s) 

392 

393 def isEndOutputLine(self, s): 

394 return self.options.sEndOutput in s 

395 

396 def createCogModule(self): 

397 """ Make a cog "module" object so that imported Python modules 

398 can say "import cog" and get our state. 

399 """ 

400 self.cogmodule = types.SimpleNamespace() 

401 self.cogmodule.path = [] 

402 

403 def openOutputFile(self, fname): 

404 """ Open an output file, taking all the details into account. 

405 """ 

406 opts = {} 

407 mode = "w" 

408 opts['encoding'] = self.options.sEncoding 

409 if self.options.bNewlines: 

410 opts["newline"] = "\n" 

411 fdir = os.path.dirname(fname) 

412 if os.path.dirname(fdir) and not os.path.exists(fdir): 

413 os.makedirs(fdir) 

414 return open(fname, mode, **opts) 

415 

416 def openInputFile(self, fname): 

417 """ Open an input file. 

418 """ 

419 if fname == "-": 

420 return sys.stdin 

421 else: 

422 return open(fname, encoding=self.options.sEncoding) 

423 

424 def processFile(self, fIn, fOut, fname=None, globals=None): 

425 """ Process an input file object to an output file object. 

426 fIn and fOut can be file objects, or file names. 

427 """ 

428 

429 sFileIn = fname or '' 

430 sFileOut = fname or '' 

431 fInToClose = fOutToClose = None 

432 # Convert filenames to files. 

433 if isinstance(fIn, (bytes, str)): 433 ↛ 435line 433 didn't jump to line 435, because the condition on line 433 was never true

434 # Open the input file. 

435 sFileIn = fIn 

436 fIn = fInToClose = self.openInputFile(fIn) 

437 if isinstance(fOut, (bytes, str)): 437 ↛ 439line 437 didn't jump to line 439, because the condition on line 437 was never true

438 # Open the output file. 

439 sFileOut = fOut 

440 fOut = fOutToClose = self.openOutputFile(fOut) 

441 

442 try: 

443 fIn = NumberedFileReader(fIn) 

444 

445 bSawCog = False 

446 

447 self.cogmodule.inFile = sFileIn 

448 self.cogmodule.outFile = sFileOut 

449 self.cogmodulename = 'cog_' + hashlib.md5(sFileOut.encode()).hexdigest() 

450 sys.modules[self.cogmodulename] = self.cogmodule 

451 # if "import cog" explicitly done in code by user, note threading will cause clashes. 

452 sys.modules['cog'] = self.cogmodule 

453 

454 # The globals dict we'll use for this file. 

455 if globals is None: 455 ↛ 459line 455 didn't jump to line 459, because the condition on line 455 was never false

456 globals = {} 

457 

458 # If there are any global defines, put them in the globals. 

459 globals.update(self.options.defines) 

460 

461 # loop over generator chunks 

462 l = fIn.readline() 

463 while l: 

464 # Find the next spec begin 

465 while l and not self.isBeginSpecLine(l): 

466 if self.isEndSpecLine(l): 466 ↛ 467line 466 didn't jump to line 467, because the condition on line 466 was never true

467 raise CogError( 

468 f"Unexpected {self.options.sEndSpec!r}", 

469 file=sFileIn, 

470 line=fIn.linenumber(), 

471 ) 

472 if self.isEndOutputLine(l): 472 ↛ 473line 472 didn't jump to line 473, because the condition on line 472 was never true

473 raise CogError( 

474 f"Unexpected {self.options.sEndOutput!r}", 

475 file=sFileIn, 

476 line=fIn.linenumber(), 

477 ) 

478 fOut.write(l) 

479 l = fIn.readline() 

480 if not l: 

481 break 

482 if not self.options.bDeleteCode: 482 ↛ 486line 482 didn't jump to line 486, because the condition on line 482 was never false

483 fOut.write(l) 

484 

485 # l is the begin spec 

486 gen = CogGenerator(options=self.options) 

487 gen.setOutput(stdout=self.stdout) 

488 gen.parseMarker(l) 

489 firstLineNum = fIn.linenumber() 

490 self.cogmodule.firstLineNum = firstLineNum 

491 

492 # If the spec begin is also a spec end, then process the single 

493 # line of code inside. 

494 if self.isEndSpecLine(l): 

495 beg = l.find(self.options.sBeginSpec) 

496 end = l.find(self.options.sEndSpec) 

497 if beg > end: 

498 raise CogError("Cog code markers inverted", 

499 file=sFileIn, line=firstLineNum) 

500 else: 

501 sCode = l[beg+len(self.options.sBeginSpec):end].strip() 

502 gen.parseLine(sCode) 

503 else: 

504 # Deal with an ordinary code block. 

505 l = fIn.readline() 

506 

507 # Get all the lines in the spec 

508 while l and not self.isEndSpecLine(l): 

509 if self.isBeginSpecLine(l): 509 ↛ 510line 509 didn't jump to line 510, because the condition on line 509 was never true

510 raise CogError( 

511 f"Unexpected {self.options.sBeginSpec!r}", 

512 file=sFileIn, 

513 line=fIn.linenumber(), 

514 ) 

515 if self.isEndOutputLine(l): 515 ↛ 516line 515 didn't jump to line 516, because the condition on line 515 was never true

516 raise CogError( 

517 f"Unexpected {self.options.sEndOutput!r}", 

518 file=sFileIn, 

519 line=fIn.linenumber(), 

520 ) 

521 if not self.options.bDeleteCode: 521 ↛ 523line 521 didn't jump to line 523, because the condition on line 521 was never false

522 fOut.write(l) 

523 gen.parseLine(l) 

524 l = fIn.readline() 

525 if not l: 525 ↛ 526line 525 didn't jump to line 526, because the condition on line 525 was never true

526 raise CogError( 

527 "Cog block begun but never ended.", 

528 file=sFileIn, line=firstLineNum) 

529 

530 if not self.options.bDeleteCode: 530 ↛ 532line 530 didn't jump to line 532, because the condition on line 530 was never false

531 fOut.write(l) 

532 gen.parseMarker(l) 

533 

534 l = fIn.readline() 

535 

536 # Eat all the lines in the output section. While reading past 

537 # them, compute the md5 hash of the old output. 

538 previous = "" 

539 hasher = hashlib.md5() 

540 while l and not self.isEndOutputLine(l): 

541 if self.isBeginSpecLine(l): 541 ↛ 542line 541 didn't jump to line 542, because the condition on line 541 was never true

542 raise CogError( 

543 f"Unexpected {self.options.sBeginSpec!r}", 

544 file=sFileIn, 

545 line=fIn.linenumber(), 

546 ) 

547 if self.isEndSpecLine(l): 547 ↛ 548line 547 didn't jump to line 548, because the condition on line 547 was never true

548 raise CogError( 

549 f"Unexpected {self.options.sEndSpec!r}", 

550 file=sFileIn, 

551 line=fIn.linenumber(), 

552 ) 

553 previous += l 

554 hasher.update(l.encode("utf-8")) 

555 l = fIn.readline() 

556 curHash = hasher.hexdigest() 

557 

558 if not l and not self.options.bEofCanBeEnd: 558 ↛ 560line 558 didn't jump to line 560, because the condition on line 558 was never true

559 # We reached end of file before we found the end output line. 

560 raise CogError( 

561 f"Missing {self.options.sEndOutput!r} before end of file.", 

562 file=sFileIn, 

563 line=fIn.linenumber(), 

564 ) 

565 

566 # Make the previous output available to the current code 

567 self.cogmodule.previous = previous 

568 

569 # Write the output of the spec to be the new output if we're 

570 # supposed to generate code. 

571 hasher = hashlib.md5() 

572 if not self.options.bNoGenerate: 572 ↛ 578line 572 didn't jump to line 578, because the condition on line 572 was never false

573 sFile = f"<cog {sFileIn}:{firstLineNum}>" 

574 sGen = gen.evaluate(cog=self, globals=globals, fname=sFile) 

575 sGen = self.suffixLines(sGen) 

576 hasher.update(sGen.encode("utf-8")) 

577 fOut.write(sGen) 

578 newHash = hasher.hexdigest() 

579 

580 bSawCog = True 

581 

582 # Write the ending output line 

583 hashMatch = self.reEndOutput.search(l) 

584 if self.options.bHashOutput: 584 ↛ 585line 584 didn't jump to line 585, because the condition on line 584 was never true

585 if hashMatch: 

586 oldHash = hashMatch['hash'] 

587 if oldHash != curHash: 

588 raise CogError("Output has been edited! Delete old checksum to unprotect.", 

589 file=sFileIn, line=fIn.linenumber()) 

590 # Create a new end line with the correct hash. 

591 endpieces = l.split(hashMatch.group(0), 1) 

592 else: 

593 # There was no old hash, but we want a new hash. 

594 endpieces = l.split(self.options.sEndOutput, 1) 

595 l = (self.sEndFormat % newHash).join(endpieces) 

596 else: 

597 # We don't want hashes output, so if there was one, get rid of 

598 # it. 

599 if hashMatch: 599 ↛ 600line 599 didn't jump to line 600, because the condition on line 599 was never true

600 l = l.replace(hashMatch['hashsect'], '', 1) 

601 

602 if not self.options.bDeleteCode: 602 ↛ 604line 602 didn't jump to line 604, because the condition on line 602 was never false

603 fOut.write(l) 

604 l = fIn.readline() 

605 

606 if not bSawCog and self.options.bWarnEmpty: 606 ↛ 607line 606 didn't jump to line 607, because the condition on line 606 was never true

607 self.showWarning(f"no cog code found in {sFileIn}") 

608 finally: 

609 if fInToClose: 609 ↛ 610line 609 didn't jump to line 610, because the condition on line 609 was never true

610 fInToClose.close() 

611 if fOutToClose: 611 ↛ 612line 611 didn't jump to line 612, because the condition on line 611 was never true

612 fOutToClose.close() 

613 

614 

615 # A regex for non-empty lines, used by suffixLines. 

616 reNonEmptyLines = re.compile(r"^\s*\S+.*$", re.MULTILINE) 

617 

618 def suffixLines(self, text): 

619 """ Add suffixes to the lines in text, if our options desire it. 

620 text is many lines, as a single string. 

621 """ 

622 if self.options.sSuffix: 622 ↛ 624line 622 didn't jump to line 624, because the condition on line 622 was never true

623 # Find all non-blank lines, and add the suffix to the end. 

624 repl = r"\g<0>" + self.options.sSuffix.replace('\\', '\\\\') 

625 text = self.reNonEmptyLines.sub(repl, text) 

626 return text 

627 

628 def processString(self, sInput, fname=None): 

629 """ Process sInput as the text to cog. 

630 Return the cogged output as a string. 

631 """ 

632 fOld = io.StringIO(sInput) 

633 fNew = io.StringIO() 

634 self.processFile(fOld, fNew, fname=fname) 

635 return fNew.getvalue() 

636 

637 def replaceFile(self, sOldPath, sNewText): 

638 """ Replace file sOldPath with the contents sNewText 

639 """ 

640 if not os.access(sOldPath, os.W_OK): 

641 # Need to ensure we can write. 

642 if self.options.sMakeWritableCmd: 

643 # Use an external command to make the file writable. 

644 cmd = self.options.sMakeWritableCmd.replace('%s', sOldPath) 

645 self.stdout.write(os.popen(cmd).read()) 

646 if not os.access(sOldPath, os.W_OK): 

647 raise CogError(f"Couldn't make {sOldPath} writable") 

648 else: 

649 # Can't write! 

650 raise CogError(f"Can't overwrite {sOldPath}") 

651 f = self.openOutputFile(sOldPath) 

652 f.write(sNewText) 

653 f.close() 

654 

655 def saveIncludePath(self): 

656 self.savedInclude = self.options.includePath[:] 

657 self.savedSysPath = sys.path[:] 

658 

659 def restoreIncludePath(self): 

660 self.options.includePath = self.savedInclude 

661 self.cogmodule.path = self.options.includePath 

662 sys.path = self.savedSysPath 

663 

664 def addToIncludePath(self, includePath): 

665 self.cogmodule.path.extend(includePath) 

666 sys.path.extend(includePath) 

667 

668 def processOneFile(self, sFile): 

669 """ Process one filename through cog. 

670 """ 

671 

672 self.saveIncludePath() 

673 bNeedNewline = False 

674 

675 try: 

676 self.addToIncludePath(self.options.includePath) 

677 # Since we know where the input file came from, 

678 # push its directory onto the include path. 

679 self.addToIncludePath([os.path.dirname(sFile)]) 

680 

681 # How we process the file depends on where the output is going. 

682 if self.options.sOutputName: 

683 self.processFile(sFile, self.options.sOutputName, sFile) 

684 elif self.options.bReplace or self.options.bCheck: 

685 # We want to replace the cog file with the output, 

686 # but only if they differ. 

687 verb = "Cogging" if self.options.bReplace else "Checking" 

688 if self.options.verbosity >= 2: 

689 self.prout(f"{verb} {sFile}", end="") 

690 bNeedNewline = True 

691 

692 try: 

693 fOldFile = self.openInputFile(sFile) 

694 sOldText = fOldFile.read() 

695 fOldFile.close() 

696 sNewText = self.processString(sOldText, fname=sFile) 

697 if sOldText != sNewText: 

698 if self.options.verbosity >= 1: 

699 if self.options.verbosity < 2: 

700 self.prout(f"{verb} {sFile}", end="") 

701 self.prout(" (changed)") 

702 bNeedNewline = False 

703 if self.options.bReplace: 

704 self.replaceFile(sFile, sNewText) 

705 else: 

706 assert self.options.bCheck 

707 self.bCheckFailed = True 

708 finally: 

709 # The try-finally block is so we can print a partial line 

710 # with the name of the file, and print (changed) on the 

711 # same line, but also make sure to break the line before 

712 # any traceback. 

713 if bNeedNewline: 

714 self.prout("") 

715 else: 

716 self.processFile(sFile, self.stdout, sFile) 

717 finally: 

718 self.restoreIncludePath() 

719 

720 def processWildcards(self, sFile): 

721 files = glob.glob(sFile) 

722 if files: 

723 for sMatchingFile in files: 

724 self.processOneFile(sMatchingFile) 

725 else: 

726 self.processOneFile(sFile) 

727 

728 def processFileList(self, sFileList): 

729 """ Process the files in a file list. 

730 """ 

731 flist = self.openInputFile(sFileList) 

732 lines = flist.readlines() 

733 flist.close() 

734 for l in lines: 

735 # Use shlex to parse the line like a shell. 

736 lex = shlex.shlex(l, posix=True) 

737 lex.whitespace_split = True 

738 lex.commenters = '#' 

739 # No escapes, so that backslash can be part of the path 

740 lex.escape = '' 

741 args = list(lex) 

742 if args: 

743 self.processArguments(args) 

744 

745 def processArguments(self, args): 

746 """ Process one command-line. 

747 """ 

748 saved_options = self.options 

749 self.options = self.options.clone() 

750 

751 self.options.parseArgs(args[1:]) 

752 self.options.validate() 

753 

754 if args[0][0] == '@': 

755 if self.options.sOutputName: 

756 raise CogUsageError("Can't use -o with @file") 

757 self.processFileList(args[0][1:]) 

758 else: 

759 self.processWildcards(args[0]) 

760 

761 self.options = saved_options 

762 

763 def callableMain(self, argv): 

764 """ All of command-line cog, but in a callable form. 

765 This is used by main. 

766 argv is the equivalent of sys.argv. 

767 """ 

768 argv = argv[1:] 

769 

770 # Provide help if asked for anywhere in the command line. 

771 if '-?' in argv or '-h' in argv: 

772 self.prerr(usage, end="") 

773 return 

774 

775 self.options.parseArgs(argv) 

776 self.options.validate() 

777 self._fixEndOutputPatterns() 

778 

779 if self.options.bShowVersion: 

780 self.prout(f"Cog version {__version__}") 

781 return 

782 

783 if self.options.args: 

784 for a in self.options.args: 

785 self.processArguments([a]) 

786 else: 

787 raise CogUsageError("No files to process") 

788 

789 if self.bCheckFailed: 

790 raise CogCheckFailed("Check failed") 

791 

792 def main(self, argv): 

793 """ Handle the command-line execution for cog. 

794 """ 

795 

796 try: 

797 self.callableMain(argv) 

798 return 0 

799 except CogUsageError as err: 

800 self.prerr(err) 

801 self.prerr("(for help use -h)") 

802 return 2 

803 except CogGeneratedError as err: 

804 self.prerr(f"Error: {err}") 

805 return 3 

806 except CogUserException as err: 

807 self.prerr("Traceback (most recent call last):") 

808 self.prerr(err.args[0]) 

809 return 4 

810 except CogCheckFailed as err: 

811 self.prerr(err) 

812 return 5 

813 except CogError as err: 

814 self.prerr(err) 

815 return 1 

816 

817 

818def find_cog_source(frame_summary, prologue): 

819 """Find cog source lines in a frame summary list, for printing tracebacks. 

820 

821 Arguments: 

822 frame_summary: a list of 4-item tuples, as returned by traceback.extract_tb. 

823 prologue: the text of the code prologue. 

824 

825 Returns 

826 A list of 4-item tuples, updated to correct the cog entries. 

827 

828 """ 

829 prolines = prologue.splitlines() 

830 for filename, lineno, funcname, source in frame_summary: 

831 if not source: 831 ↛ 843line 831 didn't jump to line 843, because the condition on line 831 was never false

832 m = re.search(r"^<cog ([^:]+):(\d+)>$", filename) 

833 if m: 833 ↛ 834line 833 didn't jump to line 834, because the condition on line 833 was never true

834 if lineno <= len(prolines): 

835 filename = '<prologue>' 

836 source = prolines[lineno-1] 

837 lineno -= 1 # Because "import cog" is the first line in the prologue 

838 else: 

839 filename, coglineno = m.groups() 

840 coglineno = int(coglineno) 

841 lineno += coglineno - len(prolines) 

842 source = linecache.getline(filename, lineno).strip() 

843 yield filename, lineno, funcname, source 

844 

845 

846def main(): 

847 """Main function for entry_points to use.""" 

848 return Cog().main(sys.argv)