Skip to content

Commit c62a974

Browse files
authored
Merge pull request #31 from WIPACrepo/query_schedd
Query schedd directly
2 parents e221a27 + 63ab78f commit c62a974

File tree

6 files changed

+110
-67
lines changed

6 files changed

+110
-67
lines changed

condor_history_to_es.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,22 @@
1212
from rest_tools.client import ClientCredentialsAuth
1313

1414
parser = ArgumentParser('usage: %prog [options] history_files')
15-
parser.add_argument('-a','--address',help='elasticsearch address')
16-
parser.add_argument('-n','--indexname',default='condor',
17-
help='index name (default condor)')
15+
parser.add_argument('-a','--address', help='elasticsearch address')
16+
parser.add_argument('-n','--indexname', default='condor',
17+
help='index name (default condor)')
1818
parser.add_argument('--dailyindex', default=False, action='store_true',
19-
help='Index pattern daily')
19+
help='Index pattern daily')
2020
parser.add_argument("-y", "--dry-run", default=False,
21-
action="store_true",
22-
help="query jobs, but do not ingest into ES",)
21+
action="store_true",
22+
help="query jobs, but do not ingest into ES",)
2323
parser.add_argument('--collectors', default=False, action='store_true',
24-
help='Args are collector addresses, not files')
25-
parser.add_argument('--client_id',help='oauth2 client id',default=None)
26-
parser.add_argument('--client_secret',help='oauth2 client secret',default=None)
27-
parser.add_argument('--token_url',help='oauth2 realm token url',default=None)
28-
parser.add_argument('--token',help='oauth2 token',default=None)
24+
help='Args are collector addresses, not files')
25+
parser.add_argument('--access_points', default=None,
26+
help="Comma separated list of APs to query; e.g. --access_points submit-1,submit2")
27+
parser.add_argument('--client_id', help='oauth2 client id', default=None)
28+
parser.add_argument('--client_secret', help='oauth2 client secret', default=None)
29+
parser.add_argument('--token_url', help='oauth2 realm token url', default=None)
30+
parser.add_argument('--token', help='oauth2 token',default=None)
2931
parser.add_argument("positionals", nargs='+')
3032

3133
options = parser.parse_args()
@@ -86,7 +88,15 @@ def es_import(document_generator):
8688
return success
8789

8890
failed = False
89-
if options.collectors:
91+
if options.access_points and options.collectors:
92+
for coll_address in options.positionals:
93+
try:
94+
gen = es_generator(read_from_collector(coll_address, options.access_points, history=True))
95+
success = es_import(gen)
96+
except htcondor.HTCondorIOError as e:
97+
failed = e
98+
logging.error('Condor error', exc_info=True)
99+
elif options.collectors:
90100
for coll_address in options.positionals:
91101
try:
92102
gen = es_generator(read_from_collector(coll_address, history=True))

condor_history_to_prometheus.py

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import prometheus_client
1010
from datetime import datetime
1111
from collections import defaultdict
12+
from socket import gethostbyname
1213

1314
utc_format = '%Y-%m-%dT%H:%M:%S'
1415

@@ -19,32 +20,39 @@ def generate_ads(entries):
1920

2021
def last_jobs_dict(collector):
2122
last_job = defaultdict(dict)
22-
23+
2324
for collector in args:
2425
schedd_ads = locate_schedds(collector)
2526
if schedd_ads is None:
2627
return None
27-
28+
2829
for s in schedd_ads:
2930
last_job[s.get('Name')] = {'ClusterId': None, 'EnteredCurrentStatus': None}
3031

3132
return last_job
32-
3333

34-
def locate_schedds(collector):
35-
try:
36-
coll = htcondor.Collector(collector)
37-
return coll.locateAll(htcondor.DaemonTypes.Schedd)
38-
except htcondor.HTCondorIOError as e:
39-
failed = e
40-
logging.error(f'Condor error: {e}')
34+
35+
def locate_schedds(collector, access_points):
36+
coll = htcondor.Collector(collector)
37+
schedds = []
38+
if access_points:
39+
try:
40+
for ap in access_points:
41+
schedds.append(coll.locate(htcondor.DaemonTypes.Schedd, ap))
42+
except htcondor.HTCondorIOError as e:
43+
logging.error(f'Condor error: {e}')
44+
else:
45+
try:
46+
schedds.append(coll.locateAll(htcondor.DaemonTypes.Schedd))
47+
except htcondor.HTCondorIOError as e:
48+
logging.error(f'Condor error: {e}')
4149

4250
def compose_ad_metrics(ad, metrics):
4351
''' Parse condor job classad and update metrics
4452
4553
Args:
4654
ad (classad): an HTCondor job classad
47-
metrics (JobMetrics): JobMetrics object
55+
metrics (JobMetrics): JobMetrics object
4856
'''
4957
# ignore this ad if walltimehrs is negative or a dagman
5058
if ad['walltimehrs'] < 0 or ad['Cmd'] == '/usr/bin/condor_dagman':
@@ -64,7 +72,7 @@ def compose_ad_metrics(ad, metrics):
6472
labels['site'] = ad['site']
6573
labels['schedd'] = ad['GlobalJobId'][0:ad['GlobalJobId'].find('#')]
6674
labels['GPUDeviceName'] = None
67-
75+
6876
if ad['ExitCode'] == 0 and ad['ExitBySignal'] is False and ad['JobStatus'] == 4:
6977
labels['usage'] = 'goodput'
7078
else:
@@ -83,7 +91,7 @@ def compose_ad_metrics(ad, metrics):
8391
resource_hrs = ad['cpuhrs']
8492
resource_request = ad['RequestCpus']
8593

86-
try:
94+
try:
8795
labels['IceProdDataset'] = ad['IceProdDataset']
8896
labels['IceProdTaskName'] = ad['IceProdTaskName']
8997
except:
@@ -100,39 +108,27 @@ def compose_ad_metrics(ad, metrics):
100108
metrics.condor_job_mem_req.labels(**labels).observe(ad['RequestMemory']/1024)
101109
metrics.condor_job_mem_used.labels(**labels).observe(ad['ResidentSetSize_RAW']/1048576)
102110

103-
def query_collector(collector, metrics, last_job):
111+
def query_collector(collector, access_points, metrics, last_job):
104112
"""Query schedds for job ads
105113
106114
Args:
107115
collector (str): address for a collector to query
108116
metrics (JobMetrics): JobMetrics instance
109117
last_job (dict): dictionary for tracking last ClusterId by schedd
110118
"""
111-
for schedd_ad in locate_schedds(collector):
119+
for schedd_ad in locate_schedds(collector, access_points):
112120
name = schedd_ad.get('Name')
113121

114122
ads = read_from_schedd(schedd_ad, history=True, since=last_job[name]['ClusterId'])
115-
if last_job[name]['EnteredCurrentStatus'] is not None:
116-
logging.info(f'{name} - read ads since {last_job[name]["ClusterId"]}:{last_job[name]["EnteredCurrentStatus"]} at timestamp {datetime.strptime(last_job[name]["EnteredCurrentStatus"],utc_format)}')
117-
118-
for ad in generate_ads(ads):
119-
if last_job[name]['ClusterId'] is None:
120-
last_job[name]['ClusterId'] = int(ad['ClusterId'])
121-
last_job[name]['EnteredCurrentStatus'] = ad['EnteredCurrentStatus']
122-
123-
if datetime.strptime(ad['EnteredCurrentStatus'],utc_format) > datetime.strptime(last_job[name]['EnteredCurrentStatus'],utc_format):
124-
last_job[name]['ClusterId'] = int(ad['ClusterId'])
125-
last_job[name]['EnteredCurrentStatus'] = ad['EnteredCurrentStatus']
126-
127-
compose_ad_metrics(ad, metrics)
123+
iterate_ads(ads, name, metrics)
128124

129125
def read_from_schedd(schedd_ad, history=False, constraint='true', projection=[],match=10000,since=None):
130126
"""Connect to schedd and pull ads directly.
131127
132128
A generator that yields condor job dicts.
133129
134130
Args:
135-
schedd (ClassAd): location_add of a schedd, from either htcondor.Colletor locate() or locateAll()
131+
schedd (ClassAd): location_add of a schedd, from either htcondor.Colletor locate() or locateAll()
136132
history (bool): read history (True) or active queue (default: False)
137133
constraint (string): string representation of a classad expression
138134
match (int): number of job ads to return
@@ -158,6 +154,21 @@ def read_from_schedd(schedd_ad, history=False, constraint='true', projection=[],
158154
except Exception:
159155
logging.info('%s failed', schedd_ad['Name'], exc_info=True)
160156

157+
def iterate_ads(ads, name, metrics, last_job):
158+
if last_job[name]['EnteredCurrentStatus'] is not None:
159+
logging.info(f'{name} - read ads since {last_job[name]["ClusterId"]}:{last_job[name]["EnteredCurrentStatus"]} at timestamp {datetime.strptime(last_job[name]["EnteredCurrentStatus"],utc_format)}')
160+
161+
for ad in generate_ads(ads):
162+
if last_job[name]['ClusterId'] is None:
163+
last_job[name]['ClusterId'] = int(ad['ClusterId'])
164+
last_job[name]['EnteredCurrentStatus'] = ad['EnteredCurrentStatus']
165+
166+
if datetime.strptime(ad['EnteredCurrentStatus'],utc_format) > datetime.strptime(last_job[name]['EnteredCurrentStatus'],utc_format):
167+
last_job[name]['ClusterId'] = int(ad['ClusterId'])
168+
last_job[name]['EnteredCurrentStatus'] = ad['EnteredCurrentStatus']
169+
170+
compose_ad_metrics(ad, metrics)
171+
161172
if __name__ == '__main__':
162173
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s : %(message)s')
163174

@@ -168,6 +179,7 @@ def read_from_schedd(schedd_ad, history=False, constraint='true', projection=[],
168179
# TODO: Add file tail function for condor history files
169180
#parser.add_option('-f','--histfile',
170181
# help='history file to read from')
182+
parser.add_option('-a','--access_points',default=None)
171183
parser.add_option('-p','--port', default=9100,
172184
action='store', type='int',
173185
help='port number for prometheus exporter')
@@ -196,10 +208,10 @@ def read_from_schedd(schedd_ad, history=False, constraint='true', projection=[],
196208
while True:
197209
start = time.time()
198210
for collector in args:
199-
query_collector(collector, metrics, last_job)
211+
query_collector(collector, options.access_points, metrics, last_job)
200212

201213
delta = time.time() - start
202214
# sleep for interval minus scrape duration
203215
# if scrape duration was longer than interval, run right away
204216
if delta < options.interval:
205-
time.sleep(options.interval - delta)
217+
time.sleep(options.interval - delta)

condor_queue_to_es.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
help="query jobs, but do not ingest into ES",)
2020
parser.add_argument('--collectors', default=False, action='store_true',
2121
help='Args are collector addresses, not files')
22+
parser.add_argument('--access_points', default=None,
23+
help="Comma separated list of APs to query; e.g. --access_points submit-1,submit2")
2224
parser.add_argument('--client_id',help='oauth2 client id',default=None)
2325
parser.add_argument('--client_secret',help='oauth2 client secret',default=None)
2426
parser.add_argument('--token_url',help='oauth2 realm token url',default=None)
@@ -61,8 +63,6 @@ def es_generator(entries):
6163
prefix = 'http'
6264
address = options.address
6365

64-
65-
6666
if '://' in address:
6767
prefix,address = address.split('://')
6868

@@ -84,7 +84,15 @@ def es_generator(entries):
8484
es_import = partial(bulk, es, max_retries=20, initial_backoff=10, max_backoff=3600)
8585

8686
failed = False
87-
if options.collectors:
87+
if options.access_points and options.collectors:
88+
for coll_address in options.positionals:
89+
try:
90+
gen = es_generator(read_from_collector(coll_address, options.access_points))
91+
success, _ = es_import(gen)
92+
except htcondor.HTCondorIOError as e:
93+
failed = e
94+
logging.error('Condor error', exc_info=True)
95+
elif options.collectors:
8896
for coll_address in options.positionals:
8997
try:
9098
gen = es_generator(read_from_collector(coll_address))

condor_queue_to_prometheus.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def compose_ad_metrics(ads):
7575

7676
parser.add_option('-c','--collectors',default=False, action='store_true',
7777
help='read history from')
78-
78+
parser.add_option('-a','--access_points',default=None)
7979
parser.add_option('-p','--port', default=9100,
8080
action='store', type='int',
8181
help='port number for prometheus exporter')
@@ -94,27 +94,34 @@ def compose_ad_metrics(ads):
9494

9595
prometheus_client.start_http_server(options.port)
9696

97-
if options.collectors:
98-
while True:
99-
gens = []
100-
start = time.time()
97+
while True:
98+
gens = []
99+
start = time.time()
100+
if options.access_points and options.collectors:
101+
for coll_address in args:
102+
try:
103+
gens.append(read_from_collector(coll_address, options.access_points))
104+
except htcondor.HTCondorIOError as e:
105+
failed = e
106+
logging.error('Condor error', exc_info=True)
107+
elif options.collectors:
101108
for coll_address in args:
102109
try:
103110
gens.append(read_from_collector(coll_address))
104111
except htcondor.HTCondorIOError as e:
105112
failed = e
106113
logging.error('Condor error', exc_info=True)
107-
gen = chain(*gens)
108-
metrics.clear()
114+
gen = chain(*gens)
115+
metrics.clear()
109116

110-
start_compose_metrics = time.perf_counter()
111-
compose_ad_metrics(generate_ads(gen))
112-
end_compose_metrics = time.perf_counter()
117+
start_compose_metrics = time.perf_counter()
118+
compose_ad_metrics(generate_ads(gen))
119+
end_compose_metrics = time.perf_counter()
113120

114-
compose_diff = end_compose_metrics - start_compose_metrics
115-
logging.info(f'Took {compose_diff} seconds to compose metrics')
121+
compose_diff = end_compose_metrics - start_compose_metrics
122+
logging.info(f'Took {compose_diff} seconds to compose metrics')
116123

117-
delta = time.time() - start
124+
delta = time.time() - start
118125

119-
if delta < options.interval:
120-
time.sleep(options.interval - delta)
126+
if delta < options.interval:
127+
time.sleep(options.interval - delta)

condor_status_to_es.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
class Dry:
3636
"""Helper class for debugging"""
3737
_dryrun = False
38-
38+
3939
def __init__(self, func):
4040
self.func = func
4141

@@ -44,7 +44,7 @@ def __call__(self, *args, **kwargs):
4444
logging.info(self.func.__name__)
4545
logging.info(args)
4646
logging.info(kwargs)
47-
47+
4848
else:
4949
return self.func(*args,**kwargs)
5050

@@ -296,7 +296,7 @@ def key = status+"."+resource;
296296
"--after", default=timedelta(hours=1), help="time to look back", type=parse_time
297297
)
298298
parser.add_argument(
299-
"-y",
299+
"-d",
300300
"--dry-run",
301301
default=False,
302302
action="store_true",
@@ -353,7 +353,7 @@ def key = status+"."+resource;
353353

354354
url = "{}://{}".format(prefix, address)
355355
logging.info("connecting to ES at %s", url)
356-
es = Elasticsearch(hosts=[url],
356+
es = Elasticsearch(hosts=[url],
357357
timeout=5000,
358358
bearer_auth=token,
359359
sniff_on_node_failure=True)

condor_utils.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,7 @@ def read_from_file(filename):
761761
else:
762762
entry += line+'\n'
763763

764-
def read_from_collector(address, history=False, constraint='true', projection=[],match=10000):
764+
def read_from_collector(address, access_points=None, history=False, constraint='true', projection=[], match=10000):
765765
"""Connect to condor collectors and schedds to pull job ads directly.
766766
767767
A generator that yields condor job dicts.
@@ -772,7 +772,12 @@ def read_from_collector(address, history=False, constraint='true', projection=[]
772772
"""
773773
import htcondor
774774
coll = htcondor.Collector(address)
775-
schedd_ads = coll.locateAll(htcondor.DaemonTypes.Schedd)
775+
schedd_ads = []
776+
if access_points:
777+
for ap in access_points.split(','):
778+
schedd_ads.append(coll.locate(htcondor.DaemonTypes.Schedd, ap))
779+
else:
780+
schedd_ads = coll.locateAll(htcondor.DaemonTypes.Schedd)
776781
for schedd_ad in schedd_ads:
777782
logging.info('getting job ads from %s', schedd_ad['Name'])
778783
schedd = htcondor.Schedd(schedd_ad)
@@ -790,6 +795,7 @@ def read_from_collector(address, history=False, constraint='true', projection=[]
790795
except Exception:
791796
logging.info('%s failed', schedd_ad['Name'], exc_info=True)
792797

798+
793799
def read_status_from_collector(address, after=datetime.now()-timedelta(hours=1)):
794800
"""Connect to condor collectors and schedds to pull job ads directly.
795801

0 commit comments

Comments
 (0)