Coverage for _build/src/powerprofilesctl: 82%

164 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2024-09-13 00:56 +0000

1#!/usr/bin/env python3 

2 

3import argparse 

4import os 

5import signal 

6import subprocess 

7import sys 

8from gi.repository import Gio, GLib 

9 

10PP_NAME = "org.freedesktop.UPower.PowerProfiles" 

11PP_PATH = "/org/freedesktop/UPower/PowerProfiles" 

12PP_IFACE = "org.freedesktop.UPower.PowerProfiles" 

13PROPERTIES_IFACE = "org.freedesktop.DBus.Properties" 

14 

15 

16def get_proxy(): 

17 bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None) 

18 return Gio.DBusProxy.new_sync( 

19 bus, Gio.DBusProxyFlags.NONE, None, PP_NAME, PP_PATH, PROPERTIES_IFACE, None 

20 ) 

21 

22 

23def command(func): 

24 def wrapper(*args, **kwargs): 

25 try: 

26 func(*args, **kwargs) 

27 except GLib.Error as error: 

28 sys.stderr.write( 

29 f"Failed to communicate with power-profiles-daemon: {error}\n" 

30 ) 

31 sys.exit(1) 

32 except ValueError as error: 

33 sys.stderr.write(f"Error: {error}\n") 

34 sys.exit(1) 

35 

36 return wrapper 

37 

38 

39@command 

40def _version(_args): 

41 client_version = "0.23" 

42 try: 

43 proxy = get_proxy() 

44 daemon_ver = proxy.Get("(ss)", PP_IFACE, "Version") 

45 except GLib.Error: 

46 daemon_ver = "unknown" 

47 print(f"client: {client_version}\ndaemon: {daemon_ver}") 

48 

49 

50@command 

51def _get(_args): 

52 proxy = get_proxy() 

53 profile = proxy.Get("(ss)", PP_IFACE, "ActiveProfile") 

54 print(profile) 

55 

56 

57@command 

58def _set(args): 

59 proxy = get_proxy() 

60 proxy.Set( 

61 "(ssv)", PP_IFACE, "ActiveProfile", GLib.Variant.new_string(args.profile[0]) 

62 ) 

63 

64 

65def get_profiles_property(prop): 

66 proxy = get_proxy() 

67 return proxy.Get("(ss)", PP_IFACE, prop) 

68 

69 

70@command 

71def _list(_args): 

72 profiles = get_profiles_property("Profiles") 

73 reason = get_proxy().Get("(ss)", PP_IFACE, "PerformanceDegraded") 

74 degraded = reason != "" 

75 active = get_proxy().Get("(ss)", PP_IFACE, "ActiveProfile") 

76 

77 index = 0 

78 for profile in reversed(profiles): 

79 if index > 0: 

80 print("") 

81 marker = "*" if profile["Profile"] == active else " " 

82 print(f'{marker} {profile["Profile"]}:') 

83 for driver in ["CpuDriver", "PlatformDriver"]: 

84 if driver not in profile: 

85 continue 

86 value = profile[driver] 

87 print(f" {driver}:\t{value}") 

88 if profile["Profile"] == "performance": 

89 print(" Degraded: ", f"yes ({reason})" if degraded else "no") 

90 index += 1 

91 

92 

93@command 

94def _list_holds(_args): 

95 holds = get_profiles_property("ActiveProfileHolds") 

96 

97 index = 0 

98 for hold in holds: 

99 if index > 0: 

100 print("") 

101 print("Hold:") 

102 print(" Profile: ", hold["Profile"]) 

103 print(" Application ID: ", hold["ApplicationId"]) 

104 print(" Reason: ", hold["Reason"]) 

105 index += 1 

106 

107 

108@command 

109def _launch(args): 

110 reason = args.reason 

111 profile = args.profile 

112 appid = args.appid 

113 if not args.arguments: 

114 raise ValueError("No command to launch") 

115 if not args.appid: 

116 appid = args.arguments[0] 

117 if not profile: 

118 profile = "performance" 

119 if not reason: 

120 reason = f"Running {args.appid}" 

121 ret = 0 

122 bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None) 

123 proxy = Gio.DBusProxy.new_sync( 

124 bus, Gio.DBusProxyFlags.NONE, None, PP_NAME, PP_PATH, PP_IFACE, None 

125 ) 

126 cookie = proxy.HoldProfile("(sss)", profile, reason, appid) 

127 

128 # print (f'Got {cookie} for {profile} hold') 

129 with subprocess.Popen(args.arguments) as launched_app: 

130 # Redirect the same signal to the child 

131 def receive_signal(signum, _stack): 

132 launched_app.send_signal(signum) 

133 

134 redirected_signals = [ 

135 signal.SIGTERM, 

136 signal.SIGINT, 

137 signal.SIGABRT, 

138 ] 

139 

140 for sig in redirected_signals: 

141 signal.signal(sig, receive_signal) 

142 

143 try: 

144 launched_app.wait() 

145 ret = launched_app.returncode 

146 except KeyboardInterrupt: 

147 ret = launched_app.returncode 

148 

149 for sig in redirected_signals: 

150 signal.signal(sig, signal.SIG_DFL) 

151 

152 proxy.ReleaseProfile("(u)", cookie) 

153 

154 if ret < 0: 

155 # Use standard POSIX signal exit code. 

156 os.kill(os.getpid(), -ret) 

157 return 

158 

159 sys.exit(ret) 

160 

161 

162def get_parser(): 

163 parser = argparse.ArgumentParser( 

164 epilog="Use “powerprofilesctl COMMAND --help” to get detailed help for individual commands", 

165 ) 

166 subparsers = parser.add_subparsers(help="Individual command help", dest="command") 

167 parser_list = subparsers.add_parser("list", help="List available power profiles") 

168 parser_list.set_defaults(func=_list) 

169 parser_list_holds = subparsers.add_parser( 

170 "list-holds", help="List current power profile holds" 

171 ) 

172 parser_list_holds.set_defaults(func=_list_holds) 

173 parser_get = subparsers.add_parser( 

174 "get", help="Print the currently active power profile" 

175 ) 

176 parser_get.set_defaults(func=_get) 

177 parser_set = subparsers.add_parser( 

178 "set", help="Set the currently active power profile" 

179 ) 

180 parser_set.add_argument( 

181 "profile", 

182 nargs=1, 

183 help="Profile to use for set command", 

184 ) 

185 parser_set.set_defaults(func=_set) 

186 parser_launch = subparsers.add_parser( 

187 "launch", 

188 help="Launch a command while holding a power profile", 

189 description="Launch the command while holding a power profile, " 

190 "either performance, or power-saver. By default, the profile hold " 

191 "is for the performance profile, but it might not be available on " 

192 "all systems. See the list command for a list of available profiles.", 

193 ) 

194 parser_launch.add_argument( 

195 "arguments", 

196 nargs="*", 

197 help="Command to launch", 

198 ) 

199 parser_launch.add_argument( 

200 "--profile", "-p", required=False, help="Profile to use for launch command" 

201 ) 

202 parser_launch.add_argument( 

203 "--reason", "-r", required=False, help="Reason to use for launch command" 

204 ) 

205 parser_launch.add_argument( 

206 "--appid", "-i", required=False, help="AppId to use for launch command" 

207 ) 

208 parser_launch.set_defaults(func=_launch) 

209 parser_version = subparsers.add_parser( 

210 "version", help="Print version information and exit" 

211 ) 

212 parser_version.set_defaults(func=_version) 

213 

214 if not os.getenv("PPD_COMPLETIONS_GENERATION"): 

215 return parser 

216 

217 try: 

218 import shtab # pylint: disable=import-outside-toplevel 

219 

220 shtab.add_argument_to(parser, ["--print-completion"]) # magic! 

221 except ImportError: 

222 pass 

223 

224 return parser 

225 

226 

227def check_unknown_args(args, unknown_args, cmd): 

228 if cmd != "launch": 

229 return False 

230 

231 for idx, unknown_arg in enumerate(unknown_args): 

232 arg = args[idx] 

233 if arg == cmd: 

234 return True 

235 if unknown_arg == arg: 

236 return False 

237 

238 return True 

239 

240 

241def main(): 

242 parser = get_parser() 

243 args, unknown = parser.parse_known_args() 

244 # default behavior is to run list if no command is given 

245 if not args.command: 

246 args.func = _list 

247 

248 if check_unknown_args(sys.argv[1:], unknown, args.command): 

249 args.arguments += unknown 

250 unknown = [] 

251 

252 if unknown: 

253 msg = argparse._("unrecognized arguments: %s") 

254 parser.error(msg % " ".join(unknown)) 

255 

256 args.func(args) 

257 

258 

259if __name__ == "__main__": 

260 main()