Coverage for silkaj/wot/status.py: 94%

77 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-04 17:03 +0000

1# Copyright 2016-2024 Maël Azimi <m.a@moul.re> 

2# 

3# Silkaj is free software: you can redistribute it and/or modify 

4# it under the terms of the GNU Affero General Public License as published by 

5# the Free Software Foundation, either version 3 of the License, or 

6# (at your option) any later version. 

7# 

8# Silkaj is distributed in the hope that it will be useful, 

9# but WITHOUT ANY WARRANTY; without even the implied warranty of 

10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

11# GNU Affero General Public License for more details. 

12# 

13# You should have received a copy of the GNU Affero General Public License 

14# along with Silkaj. If not, see <https://www.gnu.org/licenses/>. 

15 

16from collections import OrderedDict 

17from typing import Dict, List, Tuple 

18 

19import rich_click as click 

20from duniterpy.api.bma import blockchain, wot 

21from pendulum import from_timestamp, now 

22 

23from silkaj.blockchain.tools import get_blockchain_parameters 

24from silkaj.constants import DATE 

25from silkaj.network import client_instance 

26from silkaj.public_key import gen_pubkey_checksum, is_pubkey_and_check 

27from silkaj.tui import Table 

28from silkaj.wot import tools as wt 

29 

30 

31def get_sent_certifications( 

32 signed: List, 

33 time_first_block: int, 

34 params: Dict, 

35) -> Tuple[List[str], List[str]]: 

36 sent = [] 

37 expire = [] 

38 if signed: 

39 for cert in signed: 

40 sent.append(cert["uid"]) 

41 expire.append( 

42 expiration_date_from_block_id( 

43 cert["cert_time"]["block"], 

44 time_first_block, 

45 params, 

46 ), 

47 ) 

48 return sent, expire 

49 

50 

51@click.command( 

52 "status", 

53 help="Check received and sent certifications and \ 

54consult the membership status of any given identity", 

55) 

56@click.argument("uid_pubkey") 

57def status(uid_pubkey: str) -> None: 

58 """ 

59 get searched id 

60 get id of received and sent certifications 

61 display in a table the result with the numbers 

62 """ 

63 client = client_instance() 

64 first_block = client(blockchain.block, 1) 

65 time_first_block = first_block["time"] 

66 

67 checked_pubkey = is_pubkey_and_check(uid_pubkey) 

68 if checked_pubkey: 

69 uid_pubkey = str(checked_pubkey) 

70 

71 identity, pubkey, signed = wt.choose_identity(uid_pubkey) 

72 certifications = OrderedDict() # type: OrderedDict 

73 params = get_blockchain_parameters() 

74 

75 req = None 

76 requirements = client(wot.requirements, search=pubkey, pubkey=True) 

77 for req in requirements["identities"]: 

78 if req["pubkey"] == pubkey: 

79 break 

80 

81 certifications["received_expire"] = [] 

82 certifications["received"] = [] 

83 certifications["sent"] = [] 

84 certifications["sent_expire"] = [] 

85 for cert in identity["others"]: 

86 certifications["received_expire"].append( 

87 expiration_date_from_block_id( 

88 cert["meta"]["block_number"], 

89 time_first_block, 

90 params, 

91 ), 

92 ) 

93 certifications["received"].append( 

94 cert_written_in_the_blockchain(req["certifications"], cert), 

95 ) 

96 ( 

97 certifications["sent"], 

98 certifications["sent_expire"], 

99 ) = get_sent_certifications(signed, time_first_block, params) 

100 nbr_sent_certs = len(certifications["sent"]) if "sent" in certifications else 0 

101 

102 table = Table(style="columns").set_cols_align(["r", "r", "r", "r"]) 

103 table.fill_from_dict(certifications) 

104 

105 print( 

106 f'{identity["uid"]} ({gen_pubkey_checksum(pubkey, True)} \ 

107 ) \ 

108from block #{identity["meta"]["timestamp"][:15]} \ 

109 …\n\ 

110received {len(certifications["received"])} \ 

111 and sent \ 

112{nbr_sent_certs}/{params["sigStock"]} \ 

113 certifications:\n\ 

114{table.draw()} \ 

115 \n\ 

116✔: Certification available to be written or already written into the blockchain\n', 

117 ) 

118 membership_status(certifications, pubkey, req) 

119 

120 

121def cert_written_in_the_blockchain(written_certs: Dict, certifieur: Dict): 

122 for cert in written_certs: 

123 if cert["from"] == certifieur["pubkey"]: 

124 return certifieur["uids"][0] + " ✔" 

125 return certifieur["uids"][0] 

126 

127 

128def membership_status(certifications: OrderedDict, pubkey: str, req: Dict) -> None: 

129 params = get_blockchain_parameters() 

130 if len(certifications["received"]) >= params["sigQty"]: 

131 date = certifications["received_expire"][ 

132 len(certifications["received"]) - params["sigQty"] 

133 ] 

134 print(f"Membership expiration due to certification expirations: {date}") 

135 member_lookup = wt.is_member(pubkey) 

136 is_member = bool(member_lookup) 

137 print("member:", is_member) 

138 if req["revoked"]: 

139 revoke_date = from_timestamp(req["revoked_on"], tz="local").format(DATE) 

140 print(f"revoked: {req['revoked']}\nrevoked on: {revoke_date}") 

141 if not is_member and req["wasMember"]: 

142 print("expired:", req["expired"], "\nwasMember:", req["wasMember"]) 

143 elif is_member: 

144 expiration_date = now().add(seconds=req["membershipExpiresIn"]).format(DATE) 

145 print(f"Membership document expiration: {expiration_date}") 

146 print("Sentry:", req["isSentry"]) 

147 print("outdistanced:", req["outdistanced"]) 

148 

149 

150def expiration_date_from_block_id( 

151 block_id: str, 

152 time_first_block: int, 

153 params: Dict, 

154) -> str: 

155 expir_timestamp = ( 

156 date_approximation(block_id, time_first_block, params["avgGenTime"]) 

157 + params["sigValidity"] 

158 ) 

159 return from_timestamp(expir_timestamp, tz="local").format(DATE) 

160 

161 

162def date_approximation(block_id, time_first_block, avgentime): 

163 return time_first_block + block_id * avgentime