Coverage for silkaj/wot/status.py: 94%
77 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-04 17:03 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-04 17:03 +0000
1# Copyright 2016-2024 Maël Azimi <m.a@moul.re>
2#
3# Silkaj is free software: you can redistribute it and/or modify
4# it under the terms of the GNU Affero General Public License as published by
5# the Free Software Foundation, either version 3 of the License, or
6# (at your option) any later version.
7#
8# Silkaj is distributed in the hope that it will be useful,
9# but WITHOUT ANY WARRANTY; without even the implied warranty of
10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11# GNU Affero General Public License for more details.
12#
13# You should have received a copy of the GNU Affero General Public License
14# along with Silkaj. If not, see <https://www.gnu.org/licenses/>.
16from collections import OrderedDict
17from typing import Dict, List, Tuple
19import rich_click as click
20from duniterpy.api.bma import blockchain, wot
21from pendulum import from_timestamp, now
23from silkaj.blockchain.tools import get_blockchain_parameters
24from silkaj.constants import DATE
25from silkaj.network import client_instance
26from silkaj.public_key import gen_pubkey_checksum, is_pubkey_and_check
27from silkaj.tui import Table
28from silkaj.wot import tools as wt
31def get_sent_certifications(
32 signed: List,
33 time_first_block: int,
34 params: Dict,
35) -> Tuple[List[str], List[str]]:
36 sent = []
37 expire = []
38 if signed:
39 for cert in signed:
40 sent.append(cert["uid"])
41 expire.append(
42 expiration_date_from_block_id(
43 cert["cert_time"]["block"],
44 time_first_block,
45 params,
46 ),
47 )
48 return sent, expire
51@click.command(
52 "status",
53 help="Check received and sent certifications and \
54consult the membership status of any given identity",
55)
56@click.argument("uid_pubkey")
57def status(uid_pubkey: str) -> None:
58 """
59 get searched id
60 get id of received and sent certifications
61 display in a table the result with the numbers
62 """
63 client = client_instance()
64 first_block = client(blockchain.block, 1)
65 time_first_block = first_block["time"]
67 checked_pubkey = is_pubkey_and_check(uid_pubkey)
68 if checked_pubkey:
69 uid_pubkey = str(checked_pubkey)
71 identity, pubkey, signed = wt.choose_identity(uid_pubkey)
72 certifications = OrderedDict() # type: OrderedDict
73 params = get_blockchain_parameters()
75 req = None
76 requirements = client(wot.requirements, search=pubkey, pubkey=True)
77 for req in requirements["identities"]:
78 if req["pubkey"] == pubkey:
79 break
81 certifications["received_expire"] = []
82 certifications["received"] = []
83 certifications["sent"] = []
84 certifications["sent_expire"] = []
85 for cert in identity["others"]:
86 certifications["received_expire"].append(
87 expiration_date_from_block_id(
88 cert["meta"]["block_number"],
89 time_first_block,
90 params,
91 ),
92 )
93 certifications["received"].append(
94 cert_written_in_the_blockchain(req["certifications"], cert),
95 )
96 (
97 certifications["sent"],
98 certifications["sent_expire"],
99 ) = get_sent_certifications(signed, time_first_block, params)
100 nbr_sent_certs = len(certifications["sent"]) if "sent" in certifications else 0
102 table = Table(style="columns").set_cols_align(["r", "r", "r", "r"])
103 table.fill_from_dict(certifications)
105 print(
106 f'{identity["uid"]} ({gen_pubkey_checksum(pubkey, True)} \
107 ) \
108from block #{identity["meta"]["timestamp"][:15]} \
109 …\n\
110received {len(certifications["received"])} \
111 and sent \
112{nbr_sent_certs}/{params["sigStock"]} \
113 certifications:\n\
114{table.draw()} \
115 \n\
116✔: Certification available to be written or already written into the blockchain\n',
117 )
118 membership_status(certifications, pubkey, req)
121def cert_written_in_the_blockchain(written_certs: Dict, certifieur: Dict):
122 for cert in written_certs:
123 if cert["from"] == certifieur["pubkey"]:
124 return certifieur["uids"][0] + " ✔"
125 return certifieur["uids"][0]
128def membership_status(certifications: OrderedDict, pubkey: str, req: Dict) -> None:
129 params = get_blockchain_parameters()
130 if len(certifications["received"]) >= params["sigQty"]:
131 date = certifications["received_expire"][
132 len(certifications["received"]) - params["sigQty"]
133 ]
134 print(f"Membership expiration due to certification expirations: {date}")
135 member_lookup = wt.is_member(pubkey)
136 is_member = bool(member_lookup)
137 print("member:", is_member)
138 if req["revoked"]:
139 revoke_date = from_timestamp(req["revoked_on"], tz="local").format(DATE)
140 print(f"revoked: {req['revoked']}\nrevoked on: {revoke_date}")
141 if not is_member and req["wasMember"]:
142 print("expired:", req["expired"], "\nwasMember:", req["wasMember"])
143 elif is_member:
144 expiration_date = now().add(seconds=req["membershipExpiresIn"]).format(DATE)
145 print(f"Membership document expiration: {expiration_date}")
146 print("Sentry:", req["isSentry"])
147 print("outdistanced:", req["outdistanced"])
150def expiration_date_from_block_id(
151 block_id: str,
152 time_first_block: int,
153 params: Dict,
154) -> str:
155 expir_timestamp = (
156 date_approximation(block_id, time_first_block, params["avgGenTime"])
157 + params["sigValidity"]
158 )
159 return from_timestamp(expir_timestamp, tz="local").format(DATE)
162def date_approximation(block_id, time_first_block, avgentime):
163 return time_first_block + block_id * avgentime