1 '''
2 SSH-based coordination scheme between manager and agent
3 '''
4 import urlparse
5 import pdb
6 import glob
7 import errno
8 import sys
9 import os
10 import stat
11 import logging
12 import traceback
13 import pexpect
14
15 sys.path.append(os.path.join(os.path.dirname(__file__), "../.."))
16 from pilot.api import State
17 from bigjob import logger
18
19 SSH_OPTS="-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o NumberOfPasswordPrompts=0"
20
21
23 """ BigData Coordination File Management for Pilot Store """
24
25 - def __init__(self, service_url, security_context=None, pilot_data_description=None):
26 self.service_url = service_url
27 result = urlparse.urlparse(service_url)
28 self.host = result.hostname
29 self.path = result.path
30 self.user = result.username
31
32 self.pilot_data_description=pilot_data_description
33
34
35 self.userkey=None
36 self.security_context=security_context
37
38
39 if self.pilot_data_description!=None and\
40 self.pilot_data_description.has_key("userkey"):
41 self.userkey=self.pilot_data_description["userkey"]
42
43 logger.debug("Security Context: " + str(self.security_context))
44
45
46 if security_context!=None and security_context!="None":
47 logger.debug("Attempt to restore SSH credentials from security context: " + str(security_context))
48 security_context = eval(security_context)
49 key=security_context["userkey"]
50 self.userkey=os.path.join(os.getcwd(), ".ssh/id_rsa")
51 if os.path.exists(os.path.join(os.getcwd(),".ssh"))==False:
52 os.makedirs(os.path.join(os.getcwd(), ".ssh"))
53 logger.debug("Write key: " + str(type(key)) + " to: " + self.userkey)
54 try:
55 f = open(self.userkey, "w")
56 for i in key:
57 logger.debug("write: " + str(i))
58 f.write(i)
59 f.close()
60 os.chmod(self.userkey, 0600)
61 except:
62 self.__print_traceback()
63
64
66 """ Returns security context that needs to be available on the distributed
67 node in order to access this Pilot Data """
68 if (self.security_context==None or self.security_context=="None") and self.pilot_data_description.has_key("userkey"):
69 f = open(self.pilot_data_description["userkey"])
70 key = f.readlines()
71 f.close
72 self.security_context = {"userkey":key}
73 logger.debug("Return security context: " + str(self.security_context))
74 return self.security_context
75
76
78
79 try:
80 command = "mkdir -p %s"%self.path
81 self.__run_ssh_command(self.userkey, self.user, self.host, command)
82 except IOError:
83 self.__print_traceback()
84
85 self.__state=State.Running
86
87
90
91
93 self.__remove_directory(self.path)
94 self.__state=State.Done
95
96
98 if self.__client.get_transport().is_active()==True:
99 return self.__state
100 else:
101 self.__state=State.Failed
102 return self.__state
103
105 du_dir = os.path.join(self.path, str(du_id))
106 logger.debug("/bin/date")
107 command = "/bin/date"
108 self.__run_ssh_command(self.userkey, self.user, self.host, command)
109 logger.debug("mkdir: " + du_dir)
110 command = "mkdir %s"%du_dir
111 self.__run_ssh_command(self.userkey, self.user, self.host, command)
112
113
116
117
119 logger.debug("Copy DU using SCP")
120 du_items = du.list()
121 for i in du_items.keys():
122 local_filename = du_items[i]["local"]
123 remote_path = os.path.join(self.path, str(du.id), os.path.basename(local_filename))
124 logger.debug("Put file: %s to %s"%(i, remote_path))
125 if local_filename.startswith("ssh://"):
126
127 if self.__is_remote_directory(local_filename):
128 logger.warning("Path %s is a directory. Ignored."%local_filename)
129 continue
130
131
132 else:
133 try:
134 if stat.S_ISDIR(os.stat(local_filename).st_mode):
135 logger.warning("Path %s is a directory. Ignored."%local_filename)
136 continue
137 except:
138 pass
139 result = urlparse.urlparse(local_filename)
140 source_host = result.netloc
141 source_path = result.path
142 source_user = result.username
143 logger.debug(str((source_host, source_path, self.host, remote_path)))
144 self.__run_scp_command(self.userkey, source_user, source_host, source_path, self.user, self.host, remote_path)
145
146
151
152
153 - def get_du(self, du, target_url):
154 remote_url = target_url
155 local_url = self.service_url + "/" + str(du.id)
156 logger.debug("get_du(): copy %s to %s:"%(local_url, remote_url))
157 self.copy_du_to_url(du, local_url, remote_url)
158
159
161 self.__remove_directory(os.path.join(self.path, du.id))
162
163
165 logger.debug("Bytes transfered %d/%d"%(transfered_bytes, total_bytes))
166
167
168
169
170
171
172 - def transfer(self, source_url, target_url):
173 self.__third_party_transfer_scp(source_url, target_url)
174
175
177 result = urlparse.urlparse(target_url)
178 target_host = result.hostname
179 target_path = result.path
180 target_user = result.username
181 logger.debug("Create directory: %s"%target_path)
182 command = "mkdir %s"%target_path
183 rc = self.__run_ssh_command(self.userkey, target_user, target_host, command)
184 if rc==0:
185 return True
186 else:
187 return False
188
189
191 result = urlparse.urlparse(target_url)
192 return result.path
193
194
198
199
200
201
203 return os.path.join(self.path, str(du.id))
204
205
207 """Remove remote directory that may contain files.
208 """
209 if self.__exists(path):
210 command = "rm -rf %s"%path
211 rc = self.__run_ssh_command(self.userkey, self.user, self.host, command)
212 if rc==0:
213 return True
214 else:
215 return False
216
217
219 result = urlparse.urlparse(url)
220 host = result.hostname
221 path = result.path
222 user = result.username
223
224 command = "test -d %s"%path
225 rc = self.__run_ssh_command(self.userkey, user, host, command)
226 if rc==0:
227 logger.debug("Directory found: %s"%path)
228 return True
229 else:
230 logger.debug("Directory not found: %s"%path)
231 return False
232
233
235 result = urlparse.urlparse(source_url)
236 source_host = result.hostname
237 source_path = result.path
238 source_user = result.username
239 if source_host==None or source_host=="":
240 source_host="localhost"
241
242 result = urlparse.urlparse(target_url)
243 target_host = result.netloc
244 target_path = result.path
245 target_user = result.username
246 if target_host==None or target_host=="":
247 target_host="localhost"
248
249
250 if os.path.exists(os.path.dirname(source_path)):
251 logger.debug("Target and source host are localhost. Processing: %s" %(source_path))
252 expanded_path = glob.glob(source_path)
253 logger.debug("Expanded path: " + str(expanded_path))
254 for path in expanded_path:
255 if os.path.isdir(path):
256 logger.debug("Source path %s is directory"%path)
257 files = os.listdir(path)
258 for i in files:
259 try:
260 os.symlink(os.path.join(files, i), target_path)
261 except:
262 self.__print_traceback()
263 else:
264 try:
265 os.symlink(path, os.path.join(target_path, os.path.basename(path)))
266 except:
267 self.__print_traceback()
268 else:
269 self.__run_scp_command(self.userkey, source_user, source_host, source_path, target_user, target_host, target_path)
270
271
272
274 """Return True if the remote path exists
275 """
276 command = "test -e %s"%path
277 rc = self.__run_ssh_command(self.userkey, self.user, self.host, command)
278 if rc==0:
279 return True
280 else:
281 return False
282
283
284
286 prefix=""
287 if host != None:
288 prefix = "ssh " + SSH_OPTS + " "
289 if userkey != None:
290 prefix = prefix + " -i " + userkey + " "
291 if user!=None:
292 prefix = prefix + " " + user+ "@"
293 prefix = prefix + host
294
295 command = prefix + " " + command
296 logger.debug(command.strip())
297 child = pexpect.spawn(command.strip(), timeout=None)
298 output = child.readlines()
299 logger.debug("Run %s Output: %s"%(command, str(output)))
300 child.close()
301 return output
302
303
304
305 - def __run_scp_command(self, userkey, source_user, source_host, source_path, target_user, target_host, target_path):
306 logger.debug("Create scp command: source_user: %s, source_host: %s"%(source_user, source_host))
307 command = "scp " + SSH_OPTS + " "
308 if userkey != None:
309 command = command + "-i " + userkey + " "
310 if source_user!=None:
311 command = command + " " + source_user + "@"
312 if source_host != None and source_host!="" and source_host!="localhost":
313 command = command + source_host + ":"
314
315
316 command = command + source_path + " "
317
318 if target_user!=None:
319 command = command + " " + target_user + "@"
320
321 if target_host != None and target_host!="" and target_host!="localhost":
322 command = command + target_host + ":"
323
324 command = command + target_path
325 logger.debug(command)
326 child = pexpect.spawn(command.strip(), timeout=None)
327 output = child.readlines()
328 logger.debug("Run %s Output: %s"%(command, str(output)))
329 child.close()
330 return child.exitstatus
331
332
333
335 exc_type, exc_value, exc_traceback = sys.exc_info()
336 print "*** print_tb:"
337 traceback.print_tb(exc_traceback, limit=1, file=sys.stderr)
338 print "*** print_exception:"
339 traceback.print_exception(exc_type, exc_value, exc_traceback,
340 limit=2, file=sys.stderr)
341