1 import logging
2 import pdb
3 from pilot import *
4 from bigjob import logger
5 import bliss.saga as saga
6
7 try:
8 import json
9 except ImportError:
10 import simplejson as json
14 """
15 Dummy Adaptor - No distributed coordination done
16 """
17 BASE_URL="redis://localhost/"
18 SEPARATOR=":"
19
20 PILOT_PATH="pilot"
21 PILOT_DATA_PATH=PILOT_PATH
22 PILOT_DATA_SERVICE_PATH=PILOT_DATA_PATH + SEPARATOR + "pds"
23 DATA_UNIT_SERVICE_PATH=PILOT_DATA_PATH + SEPARATOR +"dus"
24 COMPUTE_DATA_SERVICE_PATH = PILOT_DATA_PATH + SEPARATOR + "cds"
25
26
27
28
29
30 @classmethod
33
34 @classmethod
36 if cls.BASE_URL==None:
37 logger.error("Coordination URL not set. Exiting Pilot-Data.")
38 raise Exception("Coordination URL not set. Exiting Pilot-Data.")
39 surl = saga.Url(cls.BASE_URL)
40 base_url = surl.scheme + "://" + surl.host + "/" + application_id
41 logger.debug(base_url)
42 return base_url
43
44
45
46
47 @classmethod
48 - def add_pds(cls, application_url, pds):
49 pds_url_no_dbtype = cls.get_pds_url(application_url, pds.id)
50 pds_url = cls.__get_url(pds_url_no_dbtype)
51 logger.debug("Create PDS directory at %s"%pds_url)
52 return pds_url_no_dbtype
53
54
55 @classmethod
57 pds_url = cls.__get_url(pds_url)
58
59
60
61
62
63
64 @classmethod
68
69
70 @classmethod
72 du_urls=None
73 if len(pd.data_unit_urls) > 0:
74 du_urls = pd.data_unit_urls
75
76 pd_dict={
77 "data_unit_urls": du_urls,
78 "pilot_data": pd.to_dict(),
79 "pilot_data_description": pd.pilot_data_description,
80 "security_context": pd.security_context
81 }
82
83 cls.__store_entry(pd.url+RedisCoordinationAdaptor.SEPARATOR + "info", pd_dict)
84
85
86 @classmethod
91
92
93 @classmethod
95 """ return a list of urls to pd managed by the PDS """
96 pds_url = cls.__get_url(pds_url)
97 logger.debug("List PD at %s"%pds_url)
98
99
100 @classmethod
102 pds_url = cls.__get_url(pds_url)
103
104
105
106
107
108
109
110
111
112 @classmethod
113 - def add_cds(cls, application_url, cds):
114 cds_url_no_dbtype = cls.get_cds_url(application_url, cds.id)
115 cds_url = cls.__get_url(cds_url_no_dbtype)
116 logger.debug("Create CDS directory at %s"%cds_url)
117
118
119
120 return cds_url_no_dbtype
121
122
123 @classmethod
125
126
127 pds_urls = [i.url for i in cds.pilot_data_services]
128
129
130 pjs_urls = [i.url for i in cds.pilot_job_services]
131
132
133
134 pd_urls = [i.url for i in cds.data_units.values()]
135
136
137 wu_urls = [i.url for i in cds.compute_units.values()]
138
139
140
141 @classmethod
143 cds_url = cls.__get_url(cds_url)
144
145
146
147
148
149
150
151
152
153
154
155 @classmethod
156 - def add_du(cls, root_url, du):
160
161
162 @classmethod
164 logger.debug("**** GET DU: " + str(du_url))
165 du_dict=cls.__retrieve_entry(du_url+ RedisCoordinationAdaptor.SEPARATOR + "info")
166 logger.debug("Retrieved DU: " + du_url + " Content: " + str(du_dict))
167 return du_dict
168
169
170 @classmethod
172 logger.debug("**** Update data unit at: " + du.url)
173 du_dict_list = [i.to_dict() for i in du.data_unit_items]
174 du_urls = [i.url for i in du.pilot_data]
175 du_dict = {
176 "data_unit_description":du.data_unit_description,
177 "state": du.state,
178 "pilot_data": du_urls,
179 "data_unit_items": du_dict_list
180 }
181 cls.__store_entry(du.url + RedisCoordinationAdaptor.SEPARATOR + "info", du_dict)
182
183
184 @classmethod
186 """ return a list of urls to du managed by the PDS """
187 pd_url = cls.__get_url(pd_url)
188 logger.debug("List Data-Units of Pilot-Data at %s"%pd_url)
189 dus = cls.__list_keys(pd_url+":du-*")
190 return dus
191
192
193 @classmethod
195 du_url = cls.__get_url(du_url)
196
197
198
199
200
201
202
203
204
205
206 @classmethod
211
212
213 @classmethod
218
219
220
221
222 @classmethod
224 import redis
225 ''' Initialize Redis API Client '''
226 saga_url = saga.Url(RedisCoordinationAdaptor.BASE_URL)
227 username = saga_url.username
228 server = saga_url.host
229 server_port = saga_url.port
230 if username==None or username=="":
231 redis_client = redis.Redis(host=server, port=server_port, db=0)
232 else:
233 redis_client = redis.Redis(host=server, port=server_port, password=username, db=0)
234
235 try:
236 redis_client.ping()
237 except:
238 logger.error("Please start Redis server!")
239 raise Exception("Please start Redis server!")
240 return redis_client
241
242
243 @classmethod
246
247
248 @classmethod
250 redis_client = cls.__get_redis_api_client()
251 keys = redis_client.keys(search_url)
252 keys_normalized = [i[:i.index(":info")] for i in keys]
253 return keys_normalized
254
255
256 @classmethod
257 - def __store_entry(cls, entry_url, content):
258 entry_url = cls.__get_url(entry_url)
259 redis_client = cls.__get_redis_api_client()
260 redis_client.hmset(entry_url, content)
261
262 logger.debug("Store Redis entry at: " + entry_url
263 + " Content: " + str(json.dumps(content)))
264
265 @classmethod
266 - def __retrieve_entry(cls, entry_url):
267 entry_url = cls.__get_url(entry_url)
268 redis_client = cls.__get_redis_api_client()
269 content = redis_client.hgetall(entry_url)
270
271 logger.debug("Retrieve Redis entry at: " + entry_url
272 + " Content: " + str(json.dumps(content)))
273 return content
274