1 '''
2 Globus Online based File Transfer
3 '''
4 import urlparse
5 import pdb
6 import glob
7 import errno
8 import sys
9 import os
10 import stat
11 import logging
12 import traceback
13 import time
14
15 sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../.."))
16
17 from pilot.api import State
18 from bigjob import logger
19
20 from globusonline.transfer import api_client
21
22
23 from globusonline.transfer.api_client.goauth import get_access_token
24
26 """ BigData Coordination File Management for Pilot Data """
27
28 URL_PREFIX="go://"
29
30
32 self.service_url = service_url
33 result = urlparse.urlparse(service_url)
34 self.host = result.netloc
35 self.query = result.path
36 self.ep = self.__get_ep(self.query)
37 self.path = self.__get_path(self.query)
38 self.user = result.username
39 self.password = result.password
40
41
42 result = get_access_token(ca_certs=None, username=self.user, password=self.password)
43
44
45 saml_cookie = result.token
46
47 self.api = api_client.TransferAPIClient(username=self.user,
48 goauth=saml_cookie
49 )
50 status_code, status_message, data = self.api.task_list()
51
52
53 self.__state=State.New
54
55
57 """ Returns security context that needs to be available on the distributed
58 node in order to access this Pilot Data """
59 return None
60
61
63
64 try:
65 self.api.endpoint_mkdir(self.ep, self.path)
66 except:
67 pass
68 self.__state=State.Running
69
70
74
75
77 self.api.endpoint_delete(self.ep, self.path)
78 self.__state=State.Done
79
80
82 if self.__client.get_transport().is_active()==True:
83 return self.__state
84 else:
85 self.__state=State.Failed
86 return self.__state
87
89 du_dir = os.path.join(self.path, str(du_id))
90 logger.debug("mkdir: " + du_dir)
91 self.api.endpoint_mkdir(self.ep, du_dir)
92
93
95 logging.debug("Copy DU using Globus Online")
96 du_items = du.list()
97 for i in du_items.keys():
98 local_filename=du_items[i]["local"]
99 remote_path = os.path.join(self.path, str(du.id), os.path.basename(local_filename))
100 logging.debug("Put file: %s to %s"%(local_filename, remote_path))
101 if local_filename.startswith("ssh://"):
102
103 if self.__is_remote_directory(local_filename):
104 logging.warning("Path %s is a directory. Ignored."%local_filename)
105 continue
106 result = urlparse.urlparse(local_filename)
107 source_host = result.netloc
108 source_path = result.path
109 logger.debug(str((source_host, source_path, self.host, remote_path)))
110 if source_host == "" or source_host==None:
111 cmd = "scp "+ source_path + " " + self.host + ":" + remote_path
112 else:
113 cmd = "scp "+ source_host+":"+source_path + " " + self.host + ":" + remote_path
114 logger.debug("Command: %s"%cmd)
115 os.system(cmd)
116 elif(local_filename.startswith("go://")):
117 self.__third_party_transfer_host(local_filename, self.service_url + "/" + str(du.id))
118
119
120
122 base_dir = self.__get_path_for_du(du)
123 logger.debug("copy_du_to_url, source: %s remote: %s"%(base_dir, remote_url))
124 if remote_url.startswith("/") and os.path.exists(base_dir):
125 target_path = remote_url
126 source_path = base_dir
127 logger.debug("Target and source host are localhost. Processing: %s" %(source_path))
128 expanded_path = glob.glob(source_path + "/*")
129 logger.debug("Expanded path: " + str(expanded_path))
130 for path in expanded_path:
131 if os.path.isdir(path):
132 logger.debug("Source path %s is directory"%path)
133 files = os.listdir(path)
134 for i in files:
135 try:
136 os.symlink(os.path.join(files, i), target_path)
137 except:
138 self.__print_traceback()
139 else:
140 try:
141 os.symlink(path, os.path.join(target_path, os.path.basename(path)))
142 except:
143 self.__print_traceback()
144 else:
145 self.create_remote_directory(remote_url)
146 for filename in self.__sftp.listdir(base_dir):
147 file_url = local_url + "/" + filename
148 file_remote_url = remote_url + "/" + filename
149 logger.debug("Copy " + file_url + " to " + file_remote_url)
150 self.__third_party_transfer_host(file_url, file_remote_url)
151
152
153
158
159
160 - def get_du(self, du, target_url):
164
165
167 self.__remove_directory(os.path.join(self.path, du.id))
168
169
170
171
172
173 - def transfer(self, source_url, target_url):
174 self.__third_party_transfer_host(source_url, target_url)
175
176
178 if not self.__is_remote_directory(target_url):
179 result = urlparse.urlparse(target_url)
180 target_query = result.path
181 target_ep = self.__get_ep(target_query)
182 target_path = self.__get_path(target_query)
183 result = self.api.endpoint_mkdir(target_ep, target_path)
184 logger.debug("GO EP: %s Directory: %s Creation Result: %s"%(target_ep, target_path, str(result)))
185
186
187
188 return True
189 return True
190
191
193 result = urlparse.urlparse(target_url)
194 target_query = result.path
195 target_path = self.__get_path(target_query)
196 return target_path
197
198
199
201 return os.path.join(self.path, str(du.id))
202
203
205 """Remove remote directory that may contain files.
206 """
207 if self.__exists(path):
208 for filename in self.__sftp.listdir(path):
209 filepath = os.path.join(path, filename)
210 logging.debug("Delete %s"%filepath)
211 if stat.S_ISDIR(self.__sftp.stat(filepath).st_mode):
212 [self.__remove_directory(filepath)]
213 else:
214 self.__sftp.remove(filepath)
215 self.__sftp.rmdir(path)
216
217
219 try:
220 result = urlparse.urlparse(url)
221 target_query = result.path
222 target_ep = self.__get_ep(target_query)
223 target_path = self.__get_path(target_query)
224 result = self.api.endpoint_ls(target_ep, target_path)
225 logger.debug("GO EP: %s Directory: %s Creation Result: %s"%(target_ep, target_path, str(result)))
226 return True
227 except:
228 pass
229 return False
230
232 ssh_client = paramiko.SSHClient()
233 ssh_client.load_system_host_keys()
234 ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
235 ssh_client.connect(self.host)
236 sftp_client = ssh_client.open_sftp()
237 sftp_client.chdir(self.path)
238 return ssh_client, sftp_client
239
240
242 """
243 Transfers from source URL to machine to target_url
244 """
245 transfer_start = time.time()
246 result = urlparse.urlparse(source_url)
247 source_query = result.path
248 source_ep = self.__get_ep(source_query)
249 source_path = self.__get_path(source_query)
250
251 result = urlparse.urlparse(target_url)
252 target_query = result.path
253 target_ep = self.__get_ep(target_query)
254 target_path = self.__get_path(target_query)
255
256
257 target_path = os.path.join(target_path, os.path.basename(source_path))
258 logger.debug("transfer from %s:%s to %s:%s"%(source_ep, source_path, target_ep, target_path))
259
260 if os.path.exists(os.path.dirname(source_path)) and os.path.exists(target_path):
261 logger.debug("Target and source host are localhost. Processing: %s" %(source_path))
262 expanded_path = glob.glob(source_path)
263 logger.debug("Expanded path: " + str(expanded_path))
264 for path in expanded_path:
265 if os.path.isdir(path):
266 logger.debug("Source path %s is directory"%path)
267 files = os.listdir(path)
268 for i in files:
269 try:
270 os.symlink(os.path.join(files, i), target_path)
271 except:
272 self.__print_traceback()
273 else:
274 try:
275 os.symlink(path, os.path.join(target_path, os.path.basename(path)))
276 except:
277 self.__print_traceback()
278
279 transfer_id = self.api.submission_id()[2]["value"]
280 logger.debug("Transfer ID: %s"%transfer_id)
281 transfer = api_client.Transfer(transfer_id, source_ep, target_ep,
282 deadline=None, sync_level=None, label=None)
283 transfer.add_item(source_path=source_path, destination_path=target_path, recursive=False )
284 result = self.api.transfer(transfer)
285 task_id = result[2]["task_id"]
286 logger.debug("Transfer Request Result: %s Task ID: %s"%(str(result), task_id))
287 self.__wait_for_task(task_id)
288 logger.debug("Task ID: %s Time: %d sec"%(transfer_id, (time.time()-transfer_start)))
289
290
292 if query_string.startswith("?"):
293 query_string = query_string[1:]
294 comp = query_string.split("&")
295 for i in comp:
296 part = i.split("=")
297 if part[0]=="ep":
298 return part[1]
299
301 if query_string.startswith("?"):
302 query_string = query_string[1:]
303 comp = query_string.split("&")
304 for i in comp:
305 part = i.split("=")
306 if part[0]=="path":
307 return part[1]
308
310 status = "ACTIVE"
311 while (timeout==None or timeout > 0) and status == "ACTIVE":
312 code, reason, data = self.api.task(task_id, fields="status")
313 status = data["status"]
314 time.sleep(1)
315 if timeout!=None:
316 timeout -= 1
317
318 if status != "ACTIVE":
319 print "Task %s complete!" % task_id
320 return True
321 else:
322 print "Task still not complete after %d seconds" % timeout
323 return False
324
325
327 """Return True if the remote path exists
328 """
329 try:
330 self.__sftp.stat(path)
331 except IOError, e:
332 if e.errno == errno.ENOENT:
333 return False
334 raise
335 else:
336 return True
337
338
340 exc_type, exc_value, exc_traceback = sys.exc_info()
341 print "*** print_tb:"
342 traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
343 print "*** print_exception:"
344 traceback.print_exception(exc_type, exc_value, exc_traceback,
345 limit=2, file=sys.stdout)
346
347
348 if __name__ == "__main__":
349 go = GlobusOnlineFileAdaptor("http://drelu:bigjob@cli.globusonline.org?ep=drelu#egi&path=/ho")
350 go.transfer("go://cli.globusonline.org?ep=drelu#MacBook&path=/~/cert.tar.gz", "go://cli.globusonline.org?ep=xsede#kraken&path=/~/")
351