Coverage for _build/src/powerprofilesctl: 82%
164 statements
« prev ^ index » next coverage.py v7.3.2, created at 2024-09-13 00:56 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2024-09-13 00:56 +0000
1#!/usr/bin/env python3
3import argparse
4import os
5import signal
6import subprocess
7import sys
8from gi.repository import Gio, GLib
10PP_NAME = "org.freedesktop.UPower.PowerProfiles"
11PP_PATH = "/org/freedesktop/UPower/PowerProfiles"
12PP_IFACE = "org.freedesktop.UPower.PowerProfiles"
13PROPERTIES_IFACE = "org.freedesktop.DBus.Properties"
16def get_proxy():
17 bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
18 return Gio.DBusProxy.new_sync(
19 bus, Gio.DBusProxyFlags.NONE, None, PP_NAME, PP_PATH, PROPERTIES_IFACE, None
20 )
23def command(func):
24 def wrapper(*args, **kwargs):
25 try:
26 func(*args, **kwargs)
27 except GLib.Error as error:
28 sys.stderr.write(
29 f"Failed to communicate with power-profiles-daemon: {error}\n"
30 )
31 sys.exit(1)
32 except ValueError as error:
33 sys.stderr.write(f"Error: {error}\n")
34 sys.exit(1)
36 return wrapper
39@command
40def _version(_args):
41 client_version = "0.23"
42 try:
43 proxy = get_proxy()
44 daemon_ver = proxy.Get("(ss)", PP_IFACE, "Version")
45 except GLib.Error:
46 daemon_ver = "unknown"
47 print(f"client: {client_version}\ndaemon: {daemon_ver}")
50@command
51def _get(_args):
52 proxy = get_proxy()
53 profile = proxy.Get("(ss)", PP_IFACE, "ActiveProfile")
54 print(profile)
57@command
58def _set(args):
59 proxy = get_proxy()
60 proxy.Set(
61 "(ssv)", PP_IFACE, "ActiveProfile", GLib.Variant.new_string(args.profile[0])
62 )
65def get_profiles_property(prop):
66 proxy = get_proxy()
67 return proxy.Get("(ss)", PP_IFACE, prop)
70@command
71def _list(_args):
72 profiles = get_profiles_property("Profiles")
73 reason = get_proxy().Get("(ss)", PP_IFACE, "PerformanceDegraded")
74 degraded = reason != ""
75 active = get_proxy().Get("(ss)", PP_IFACE, "ActiveProfile")
77 index = 0
78 for profile in reversed(profiles):
79 if index > 0:
80 print("")
81 marker = "*" if profile["Profile"] == active else " "
82 print(f'{marker} {profile["Profile"]}:')
83 for driver in ["CpuDriver", "PlatformDriver"]:
84 if driver not in profile:
85 continue
86 value = profile[driver]
87 print(f" {driver}:\t{value}")
88 if profile["Profile"] == "performance":
89 print(" Degraded: ", f"yes ({reason})" if degraded else "no")
90 index += 1
93@command
94def _list_holds(_args):
95 holds = get_profiles_property("ActiveProfileHolds")
97 index = 0
98 for hold in holds:
99 if index > 0:
100 print("")
101 print("Hold:")
102 print(" Profile: ", hold["Profile"])
103 print(" Application ID: ", hold["ApplicationId"])
104 print(" Reason: ", hold["Reason"])
105 index += 1
108@command
109def _launch(args):
110 reason = args.reason
111 profile = args.profile
112 appid = args.appid
113 if not args.arguments:
114 raise ValueError("No command to launch")
115 if not args.appid:
116 appid = args.arguments[0]
117 if not profile:
118 profile = "performance"
119 if not reason:
120 reason = f"Running {args.appid}"
121 ret = 0
122 bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
123 proxy = Gio.DBusProxy.new_sync(
124 bus, Gio.DBusProxyFlags.NONE, None, PP_NAME, PP_PATH, PP_IFACE, None
125 )
126 cookie = proxy.HoldProfile("(sss)", profile, reason, appid)
128 # print (f'Got {cookie} for {profile} hold')
129 with subprocess.Popen(args.arguments) as launched_app:
130 # Redirect the same signal to the child
131 def receive_signal(signum, _stack):
132 launched_app.send_signal(signum)
134 redirected_signals = [
135 signal.SIGTERM,
136 signal.SIGINT,
137 signal.SIGABRT,
138 ]
140 for sig in redirected_signals:
141 signal.signal(sig, receive_signal)
143 try:
144 launched_app.wait()
145 ret = launched_app.returncode
146 except KeyboardInterrupt:
147 ret = launched_app.returncode
149 for sig in redirected_signals:
150 signal.signal(sig, signal.SIG_DFL)
152 proxy.ReleaseProfile("(u)", cookie)
154 if ret < 0:
155 # Use standard POSIX signal exit code.
156 os.kill(os.getpid(), -ret)
157 return
159 sys.exit(ret)
162def get_parser():
163 parser = argparse.ArgumentParser(
164 epilog="Use “powerprofilesctl COMMAND --help” to get detailed help for individual commands",
165 )
166 subparsers = parser.add_subparsers(help="Individual command help", dest="command")
167 parser_list = subparsers.add_parser("list", help="List available power profiles")
168 parser_list.set_defaults(func=_list)
169 parser_list_holds = subparsers.add_parser(
170 "list-holds", help="List current power profile holds"
171 )
172 parser_list_holds.set_defaults(func=_list_holds)
173 parser_get = subparsers.add_parser(
174 "get", help="Print the currently active power profile"
175 )
176 parser_get.set_defaults(func=_get)
177 parser_set = subparsers.add_parser(
178 "set", help="Set the currently active power profile"
179 )
180 parser_set.add_argument(
181 "profile",
182 nargs=1,
183 help="Profile to use for set command",
184 )
185 parser_set.set_defaults(func=_set)
186 parser_launch = subparsers.add_parser(
187 "launch",
188 help="Launch a command while holding a power profile",
189 description="Launch the command while holding a power profile, "
190 "either performance, or power-saver. By default, the profile hold "
191 "is for the performance profile, but it might not be available on "
192 "all systems. See the list command for a list of available profiles.",
193 )
194 parser_launch.add_argument(
195 "arguments",
196 nargs="*",
197 help="Command to launch",
198 )
199 parser_launch.add_argument(
200 "--profile", "-p", required=False, help="Profile to use for launch command"
201 )
202 parser_launch.add_argument(
203 "--reason", "-r", required=False, help="Reason to use for launch command"
204 )
205 parser_launch.add_argument(
206 "--appid", "-i", required=False, help="AppId to use for launch command"
207 )
208 parser_launch.set_defaults(func=_launch)
209 parser_version = subparsers.add_parser(
210 "version", help="Print version information and exit"
211 )
212 parser_version.set_defaults(func=_version)
214 if not os.getenv("PPD_COMPLETIONS_GENERATION"):
215 return parser
217 try:
218 import shtab # pylint: disable=import-outside-toplevel
220 shtab.add_argument_to(parser, ["--print-completion"]) # magic!
221 except ImportError:
222 pass
224 return parser
227def check_unknown_args(args, unknown_args, cmd):
228 if cmd != "launch":
229 return False
231 for idx, unknown_arg in enumerate(unknown_args):
232 arg = args[idx]
233 if arg == cmd:
234 return True
235 if unknown_arg == arg:
236 return False
238 return True
241def main():
242 parser = get_parser()
243 args, unknown = parser.parse_known_args()
244 # default behavior is to run list if no command is given
245 if not args.command:
246 args.func = _list
248 if check_unknown_args(sys.argv[1:], unknown, args.command):
249 args.arguments += unknown
250 unknown = []
252 if unknown:
253 msg = argparse._("unrecognized arguments: %s")
254 parser.error(msg % " ".join(unknown))
256 args.func(args)
259if __name__ == "__main__":
260 main()