Coverage for silkaj/money/transfer.py: 93%

210 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-17 17:03 +0000

1# Copyright 2016-2024 Maël Azimi <m.a@moul.re> 

2# 

3# Silkaj is free software: you can redistribute it and/or modify 

4# it under the terms of the GNU Affero General Public License as published by 

5# the Free Software Foundation, either version 3 of the License, or 

6# (at your option) any later version. 

7# 

8# Silkaj is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU Affero General Public License for more details. 

12# 

13# You should have received a copy of the GNU Affero General Public License 

14# along with Silkaj. If not, see <https://www.gnu.org/licenses/>. 

15 

16import math 

17import re 

18import shlex 

19import time 

20from pathlib import Path 

21from typing import List, Optional, Tuple 

22 

23import rich_click as click 

24from duniterpy.api.bma.tx import process 

25from duniterpy.documents import ( 

26 BlockID, 

27 InputSource, 

28 OutputSource, 

29 SIGParameter, 

30 Transaction, 

31 Unlock, 

32) 

33from duniterpy.key import SigningKey 

34 

35from silkaj import auth, network, public_key, tools, tui 

36from silkaj.blockchain import tools as bc_tools 

37from silkaj.constants import ( 

38 BMA_SLEEP, 

39 CENT_MULT_TO_UNIT, 

40 MINIMAL_ABSOLUTE_TX_AMOUNT, 

41 MINIMAL_RELATIVE_TX_AMOUNT, 

42) 

43from silkaj.money import tools as m_tools 

44from silkaj.public_key import gen_pubkey_checksum 

45 

46MAX_COMMENT_LENGTH = 255 

47 

48 

49# max size for tx doc is 100 lines. 

50# Formula for accepted field numbers is: 

51# (2 * IU + 2 * IS + OUT) <= ( MAX_LINES_IN_TX_DOC - FIX_LINES) 

52# with IU = inputs/unlocks ; IS = Issuers/Signatures ; OUT = Outpouts. 

53MAX_LINES_IN_TX_DOC = 100 

54# 2 lines are necessary, and we block 1 more for the comment 

55FIX_LINES = 3 

56# assuming there is only 1 issuer and 2 outputs, max inputs is 46 

57MAX_INPUTS_PER_TX = 46 

58# assuming there is 1 issuer and 1 input, max outputs is 93. 

59MAX_OUTPUTS = 93 

60# for now, silkaj handles txs for one issuer only 

61NBR_ISSUERS = 1 

62 

63 

64@click.command("transfer", help="Transfer money") 

65@click.option( 

66 "amounts", 

67 "--amount", 

68 "-a", 

69 multiple=True, 

70 type=click.FloatRange(MINIMAL_ABSOLUTE_TX_AMOUNT), 

71 help=f"Quantitative amount(s):\n-a <amount>\nMinimum amount is {MINIMAL_ABSOLUTE_TX_AMOUNT}.", 

72 cls=tools.MutuallyExclusiveOption, 

73 mutually_exclusive=["amountsud", "allsources", "file_path"], 

74) 

75@click.option( 

76 "amountsud", 

77 "--amountUD", 

78 "-d", 

79 multiple=True, 

80 type=click.FloatRange(MINIMAL_RELATIVE_TX_AMOUNT), 

81 help=f"Relative amount(s):\n-d <amount_UD>\nMinimum amount is {MINIMAL_RELATIVE_TX_AMOUNT}", 

82 cls=tools.MutuallyExclusiveOption, 

83 mutually_exclusive=["amounts", "allsources", "file_path"], 

84) 

85@click.option( 

86 "--allSources", 

87 is_flag=True, 

88 help="Send all sources to one recipient", 

89 cls=tools.MutuallyExclusiveOption, 

90 mutually_exclusive=["amounts", "amountsud", "file_path"], 

91) 

92@click.option( 

93 "recipients", 

94 "--recipient", 

95 "-r", 

96 multiple=True, 

97 help="Pubkey(s)` recipients + optional checksum:\n-r <pubkey>[:checksum]\n\ 

98Sending to many recipients is possible:\n\ 

99* With one amount, all will receive the amount\n\ 

100* With many amounts (one per recipient)", 

101 cls=tools.MutuallyExclusiveOption, 

102 mutually_exclusive=["file_path"], 

103) 

104@click.option( 

105 "file_path", 

106 "--file", 

107 "-f", 

108 type=click.Path(exists=True, dir_okay=False, path_type=Path), 

109 help="File`s path containing a list of amounts in absolute or \ 

110relative reference and recipients` pubkeys", 

111 cls=tools.MutuallyExclusiveOption, 

112 mutually_exclusive=["recipients", "amounts", "amountsUD", "allsources"], 

113) 

114@click.option("--comment", "-c", default="", help="Comment") 

115@click.option( 

116 "--outputBackChange", 

117 help="Pubkey recipient to send the rest of the transaction: <pubkey[:checksum]>", 

118) 

119@click.option( 

120 "--yes", 

121 "-y", 

122 is_flag=True, 

123 help="Assume yes. Do not prompt confirmation", 

124) 

125def transfer_money( 

126 amounts: List[float], 

127 amountsud: List[float], 

128 allsources: bool, 

129 recipients: List[str], 

130 file_path: Path, 

131 comment: str, 

132 outputbackchange: str, 

133 yes: bool, 

134) -> None: 

135 if file_path: 

136 tx_amounts, recipients = parse_file_containing_amounts_recipients(file_path) 

137 else: 

138 if not (amounts or amountsud or allsources): 

139 tools.message_exit("Error: amount, amountUD or allSources is not set.") 

140 if not recipients: 

141 tools.message_exit("Error: A recipient should be passed") 

142 if allsources and len(recipients) > 1: 

143 tools.message_exit( 

144 "Error: the --allSources option can only be used with one recipient.", 

145 ) 

146 # compute amounts and amountsud 

147 if not allsources: 

148 tx_amounts = transaction_amount(amounts, amountsud, recipients) 

149 

150 key = auth.auth_method() 

151 issuer_pubkey = key.pubkey 

152 

153 pubkey_amount = m_tools.get_amount_from_pubkey(issuer_pubkey) 

154 if allsources: 

155 if pubkey_amount[0] <= 0: 

156 tools.message_exit( 

157 f"Error: Issuer pubkey {gen_pubkey_checksum(issuer_pubkey)} \ 

158 is empty. \ 

159No transaction sent.", 

160 ) 

161 

162 tx_amounts = [pubkey_amount[0]] 

163 

164 recipients = list(recipients) 

165 outputbackchange = check_transaction_values( 

166 comment, 

167 recipients, 

168 outputbackchange, 

169 pubkey_amount[0] < sum(tx_amounts), 

170 issuer_pubkey, 

171 ) 

172 

173 if not yes: 

174 table = tui.Table() 

175 table.fill_rows( 

176 gen_confirmation_table( 

177 issuer_pubkey, 

178 pubkey_amount[0], 

179 tx_amounts, 

180 recipients, 

181 outputbackchange, 

182 comment, 

183 ), 

184 ) 

185 confirmation_table = table.draw() 

186 

187 if yes or click.confirm( 

188 f"{confirmation_table}\nDo you confirm sending this transaction?", 

189 ): 

190 handle_intermediaries_transactions( 

191 key, 

192 issuer_pubkey, 

193 tx_amounts, 

194 recipients, 

195 comment, 

196 outputbackchange, 

197 ) 

198 

199 

200def parse_file_containing_amounts_recipients( 

201 file_path: Path, 

202) -> Tuple[List[int], List[str]]: 

203 """ 

204 Parse file in a specific format 

205 Comments are ignored 

206 Format should be: 

207 ```txt 

208 [ABSOLUTE/RELATIVE] 

209 

210 # comment1 

211 amount1 recipient1`s pubkey 

212 # comment2 

213 amount2 recipient2`s pubkey 

214 ``` 

215 """ 

216 reference = "" 

217 amounts, recipients = [], [] 

218 with file_path.open(encoding="utf-8") as file: 

219 for n, raw_line in enumerate(file): 

220 line = shlex.split(raw_line, True) 

221 if line: 

222 if n == 0: 

223 reference = line[0] 

224 else: 

225 try: 

226 amounts.append(float(line[0])) 

227 recipients.append(line[1]) 

228 except (ValueError, IndexError): 

229 tools.message_exit(f"Syntax error at line {n + 1}") 

230 

231 if not reference or reference not in ("ABSOLUTE", "RELATIVE"): 

232 tools.message_exit( 

233 f"{file_path} must contain at first line 'ABSOLUTE' or 'RELATIVE' header", 

234 ) 

235 

236 if not amounts or not recipients: 

237 tools.message_exit("No amounts or recipients specified") 

238 

239 # Compute amount depending on the reference 

240 reference_mult = ( 

241 CENT_MULT_TO_UNIT if reference == "ABSOLUTE" else m_tools.get_ud_value() 

242 ) 

243 tx_amounts = compute_amounts(amounts, reference_mult) 

244 

245 return tx_amounts, recipients 

246 

247 

248def transaction_amount( 

249 amounts: List[float], 

250 UDs_amounts: List[float], 

251 outputAddresses: List[str], 

252) -> List[int]: 

253 """ 

254 Check that the number of passed amounts(UD) and recipients are the same 

255 Returns a list of amounts. 

256 """ 

257 # Create amounts list 

258 if amounts: 

259 amounts_list = compute_amounts(amounts, CENT_MULT_TO_UNIT) 

260 elif UDs_amounts: 

261 UD_value = m_tools.get_ud_value() 

262 amounts_list = compute_amounts(UDs_amounts, UD_value) 

263 if len(amounts_list) != len(outputAddresses) and len(amounts_list) != 1: 

264 tools.message_exit( 

265 "Error: The number of passed recipients is not the same as the passed amounts.", 

266 ) 

267 # In case one amount is passed with multiple recipients 

268 # generate list containing multiple time the same amount 

269 if len(amounts_list) == 1 and len(outputAddresses) > 1: 

270 return [amounts_list[0]] * len(outputAddresses) 

271 return amounts_list 

272 

273 

274def compute_amounts(amounts: List[float], multiplicator: float) -> List[int]: 

275 """ 

276 Computes the amounts(UD) and returns a list. 

277 Multiplicator should be either CENT_MULT_TO_UNIT or UD_Value. 

278 If relative amount, check that amount is superior to minimal amount. 

279 """ 

280 # Create amounts list 

281 amounts_list = [] 

282 for amount in amounts: 

283 computed_amount = amount * multiplicator 

284 # check if relative amounts are high enough 

285 if (multiplicator != CENT_MULT_TO_UNIT) and ( 

286 computed_amount < (MINIMAL_ABSOLUTE_TX_AMOUNT * CENT_MULT_TO_UNIT) 

287 ): 

288 tools.message_exit(f"Error: amount {amount} is too low.") 

289 amounts_list.append(round(computed_amount)) 

290 return amounts_list 

291 

292 

293def check_transaction_values( 

294 comment: str, 

295 outputAddresses: List[str], 

296 outputBackChange: str, 

297 enough_source: bool, 

298 issuer_pubkey: str, 

299) -> str: 

300 """ 

301 Check the comment format 

302 Check the pubkeys and the checksums of the recipients and the outputbackchange 

303 In case of a valid checksum, assign and return the pubkey without the checksum 

304 Check the balance is big enough for the transaction 

305 """ 

306 checkComment(comment) 

307 # we check output numbers and leave one line for the backchange. 

308 if len(outputAddresses) > (MAX_OUTPUTS - 1): 

309 tools.message_exit( 

310 f"Error : there should be less than {MAX_OUTPUTS - 1} outputs.", 

311 ) 

312 for i, outputAddress in enumerate(outputAddresses): 

313 if public_key.check_pubkey_format(outputAddress): 

314 outputAddresses[i] = public_key.validate_checksum(outputAddress) 

315 if outputBackChange and public_key.check_pubkey_format(outputBackChange): 

316 outputBackChange = public_key.validate_checksum(outputBackChange) 

317 if enough_source: 

318 pubkey = gen_pubkey_checksum(issuer_pubkey) 

319 tools.message_exit( 

320 f"{pubkey} pubkey doesn`t have enough money for this transaction.", 

321 ) 

322 return outputBackChange 

323 

324 

325def gen_confirmation_table( 

326 issuer_pubkey: str, 

327 pubkey_amount: int, 

328 tx_amounts: List[int], 

329 outputAddresses: List[str], 

330 outputBackChange: str, 

331 comment: str, 

332) -> List[List]: 

333 """ 

334 Generate transaction confirmation 

335 """ 

336 

337 currency_symbol = tools.get_currency_symbol() 

338 ud_value = m_tools.get_ud_value() 

339 total_tx_amount = sum(tx_amounts) 

340 tx = [] # type: List[List[str]] 

341 # display account situation 

342 m_tools.display_amount( 

343 tx, 

344 "Initial balance", 

345 pubkey_amount, 

346 ud_value, 

347 currency_symbol, 

348 ) 

349 m_tools.display_amount( 

350 tx, 

351 "Total transaction amount", 

352 total_tx_amount, 

353 ud_value, 

354 currency_symbol, 

355 ) 

356 m_tools.display_amount( 

357 tx, 

358 "Balance after transaction", 

359 (pubkey_amount - total_tx_amount), 

360 ud_value, 

361 currency_symbol, 

362 ) 

363 m_tools.display_pubkey(tx, "From", issuer_pubkey) 

364 # display outputs and amounts 

365 for outputAddress, tx_amount in zip(outputAddresses, tx_amounts): 

366 m_tools.display_pubkey(tx, "To", outputAddress) 

367 time.sleep(BMA_SLEEP) 

368 m_tools.display_amount(tx, "Amount", tx_amount, ud_value, currency_symbol) 

369 # display last informations 

370 if outputBackChange: 

371 m_tools.display_pubkey(tx, "Backchange", outputBackChange) 

372 tx.append(["Comment", comment]) 

373 return tx 

374 

375 

376def get_list_input_for_transaction( 

377 pubkey: str, 

378 TXamount: int, 

379 outputs_number: int, 

380) -> Tuple[List[InputSource], int, bool]: 

381 listinput = m_tools.get_sources(pubkey)[0] 

382 maxInputsNumber = max_inputs_number(outputs_number, NBR_ISSUERS) 

383 # generate final list source 

384 listinputfinal = [] 

385 totalAmountInput = 0 

386 intermediatetransaction = False 

387 for nbr_inputs, _input in enumerate(listinput, start=1): 

388 listinputfinal.append(_input) 

389 totalAmountInput += m_tools.amount_in_current_base(_input) 

390 TXamount -= m_tools.amount_in_current_base(_input) 

391 # if too much sources, it's an intermediate transaction. 

392 amount_not_reached_and_max_doc_size_reached = ( 

393 TXamount > 0 and nbr_inputs >= MAX_INPUTS_PER_TX 

394 ) 

395 amount_reached_too_much_inputs = TXamount <= 0 and maxInputsNumber < nbr_inputs 

396 if ( 

397 amount_not_reached_and_max_doc_size_reached 

398 or amount_reached_too_much_inputs 

399 ): 

400 intermediatetransaction = True 

401 # if we reach the MAX_INPUTX_PER_TX limit, we send the interm.tx 

402 # if we gather the good amount, we send the tx : 

403 # - either this is no int.tx, and the tx is sent to the receiver, 

404 # - or the int.tx it is sent to the issuer before sent to the receiver. 

405 if nbr_inputs >= MAX_INPUTS_PER_TX or TXamount <= 0: 

406 break 

407 if TXamount > 0 and not intermediatetransaction: 

408 tools.message_exit("Error: you don't have enough money") 

409 return listinputfinal, totalAmountInput, intermediatetransaction 

410 

411 

412def handle_intermediaries_transactions( 

413 key: SigningKey, 

414 issuers: str, 

415 tx_amounts: List[int], 

416 outputAddresses: List[str], 

417 Comment: str = "", 

418 OutputbackChange: Optional[str] = None, 

419) -> None: 

420 while True: 

421 # consider there is always one backchange output, hence +1 

422 listinput_and_amount = get_list_input_for_transaction( 

423 issuers, 

424 sum(tx_amounts), 

425 len(outputAddresses) + 1, 

426 ) 

427 intermediatetransaction = listinput_and_amount[2] 

428 

429 if intermediatetransaction: 

430 totalAmountInput = listinput_and_amount[1] 

431 generate_and_send_transaction( 

432 key, 

433 issuers, 

434 [totalAmountInput], 

435 listinput_and_amount, 

436 [issuers], 

437 "Change operation", 

438 ) 

439 else: 

440 generate_and_send_transaction( 

441 key, 

442 issuers, 

443 tx_amounts, 

444 listinput_and_amount, 

445 outputAddresses, 

446 Comment, 

447 OutputbackChange, 

448 ) 

449 break 

450 

451 

452def max_inputs_number(outputs_number: int, issuers_number: int) -> int: 

453 """ 

454 returns the maximum number of inputs. 

455 This function does not take care of backchange line. 

456 formula is IU <= (MAX_LINES_IN_TX_DOC - FIX_LINES - O - 2*IS)/2 

457 """ 

458 return int( 

459 (MAX_LINES_IN_TX_DOC - FIX_LINES - (2 * issuers_number) - outputs_number) / 2, 

460 ) 

461 

462 

463def generate_and_send_transaction( 

464 key: SigningKey, 

465 issuers: str, 

466 tx_amounts: List[int], 

467 listinput_and_amount: Tuple[List[InputSource], int, bool], 

468 outputAddresses: List[str], 

469 Comment: str, 

470 OutputbackChange: Optional[str] = None, 

471) -> None: 

472 """ 

473 Display sent transaction 

474 Generate, sign, and send transaction document 

475 """ 

476 intermediate_tx = listinput_and_amount[2] 

477 if intermediate_tx: 

478 print("Generate Change Transaction") 

479 else: 

480 print("Generate Transaction:") 

481 print(" - From: " + gen_pubkey_checksum(issuers)) 

482 for tx_amount, outputAddress in zip(tx_amounts, outputAddresses): 

483 display_sent_tx(outputAddress, tx_amount) 

484 print(" - Total: " + str(sum(tx_amounts) / 100)) 

485 

486 transaction = generate_transaction_document( 

487 issuers, 

488 tx_amounts, 

489 listinput_and_amount, 

490 outputAddresses, 

491 Comment, 

492 OutputbackChange, 

493 ) 

494 transaction.sign(key) 

495 network.send_document(process, transaction) 

496 

497 

498def display_sent_tx(outputAddress: str, amount: int) -> None: 

499 print( 

500 " - To: ", 

501 gen_pubkey_checksum(outputAddress), 

502 "\n - Amount: ", 

503 amount / 100, 

504 ) 

505 

506 

507def generate_transaction_document( 

508 issuers: str, 

509 tx_amounts: List[int], 

510 listinput_and_amount: Tuple[List[InputSource], int, bool], 

511 outputAddresses: List[str], 

512 Comment: str = "", 

513 OutputbackChange: Optional[str] = None, 

514) -> Transaction: 

515 listinput = listinput_and_amount[0] 

516 totalAmountInput = listinput_and_amount[1] 

517 total_tx_amount = sum(tx_amounts) 

518 

519 head_block = bc_tools.get_head_block() 

520 

521 if not OutputbackChange: 

522 OutputbackChange = issuers 

523 

524 # If it's not a foreign exchange transaction, 

525 # we remove units after two digits after the decimal point 

526 if issuers not in outputAddresses: 

527 total_tx_amount = ( 

528 total_tx_amount // 10 ** head_block["unitbase"] 

529 ) * 10 ** head_block["unitbase"] 

530 

531 # Generate output 

532 ################ 

533 listoutput = [] # type: List[OutputSource] 

534 for tx_amount, outputAddress in zip(tx_amounts, outputAddresses): 

535 generate_output(listoutput, head_block["unitbase"], tx_amount, outputAddress) 

536 

537 # Outputs to himself 

538 rest = totalAmountInput - total_tx_amount 

539 generate_output(listoutput, head_block["unitbase"], rest, OutputbackChange) 

540 

541 # Unlocks 

542 unlocks = generate_unlocks(listinput) 

543 

544 # Generate transaction document 

545 ############################## 

546 

547 return Transaction( 

548 block_id=BlockID(head_block["number"], head_block["hash"]), 

549 locktime=0, 

550 issuers=[issuers], 

551 inputs=listinput, 

552 unlocks=unlocks, 

553 outputs=listoutput, 

554 comment=Comment, 

555 currency=head_block["currency"], 

556 ) 

557 

558 

559def generate_unlocks(listinput: List[InputSource]) -> List[Unlock]: 

560 unlocks = [] 

561 for i in range(len(listinput)): 

562 unlocks.append(Unlock(index=i, parameters=[SIGParameter(0)])) 

563 return unlocks 

564 

565 

566def generate_output( 

567 listoutput: List[OutputSource], 

568 unitbase: int, 

569 rest: int, 

570 recipient_address: str, 

571) -> None: 

572 while rest > 0: 

573 outputAmount = truncBase(rest, unitbase) 

574 rest -= outputAmount 

575 if outputAmount > 0: 

576 outputAmount = int(outputAmount / math.pow(10, unitbase)) 

577 listoutput.append( 

578 OutputSource( 

579 amount=outputAmount, 

580 base=unitbase, 

581 condition=f"SIG({recipient_address})", 

582 ), 

583 ) 

584 unitbase = unitbase - 1 

585 

586 

587def checkComment(comment: str) -> None: 

588 if len(comment) > MAX_COMMENT_LENGTH: 

589 tools.message_exit("Error: Comment is too long") 

590 regex = re.compile( 

591 "^[0-9a-zA-Z\\ \\-\\_\\:\\/\\;\\*\\[\\]\\(\\)\\?\ 

592\\!\\^\\+\\=\\@\\&\\~\\#\\{\\}\\|\\\\<\\>\\%\\.]*$", 

593 ) 

594 if not re.search(regex, comment): 

595 tools.message_exit("Error: the format of the comment is invalid") 

596 

597 

598def truncBase(amount: int, base: int) -> int: 

599 _pow = int(math.pow(10, base)) 

600 if amount < _pow: 

601 return 0 

602 return math.trunc(amount / _pow) * _pow