1 '''
2 Google Storage based File Transfer Implementation
3 '''
4 import urlparse
5
6 import errno
7 import sys
8 import os
9 import stat
10 import logging
11 import traceback
12 import time
13
14 sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../.."))
15 from pilot.api import State
16 from bigjob import logger
17
18 from apiclient.discovery import build
19 from apiclient.http import MediaFileUpload
20 from oauth2client.file import Storage
21 from oauth2client.client import OAuth2WebServerFlow
22 from oauth2client.client import Credentials
23 from oauth2client.tools import run
24 import httplib2
25 import urllib
26
27 """
28 AN OAUTH2 Client Id must be created at the Google API console at:
29
30 https://code.google.com/apis/console/
31
32 => API Access
33
34 More information with respect to OAUTH: https://developers.google.com/compute/docs/api/how-tos/authorization
35 """
36 OAUTH2_CLIENT_ID='1004462711324-55akehip32m59u6omdfrt9s8u8ehb0hm.apps.googleusercontent.com'
37 OAUTH2_CLIENT_SECRET='EIMML1W7anu0XijVghws0DY-'
38
39 GS_PROJECT_ID="1004462711324"
40
41
43 """ BigData File Management for Pilot Data """
44
45
46 - def __init__(self, service_url, security_context=None):
47
48 self.service_url = service_url
49 self.bucket_name = self.__get_bucket_name(service_url)
50 self.__state=State.New
51
52
53 if security_context!=None:
54 logger.debug("Attempt to restore credentials from security context: " + str(security_context))
55 self.credentials = Credentials.new_from_json(security_context)
56 else:
57 storage = Storage('gce.dat')
58 self.credentials = storage.get()
59 if self.credentials is None or self.credentials.invalid == True:
60 logger.debug("No valid credential found. Run new OAuth authentication round...")
61 flow = OAuth2WebServerFlow(
62 client_id=OAUTH2_CLIENT_ID,
63 client_secret=OAUTH2_CLIENT_SECRET,
64 scope=['https://www.googleapis.com/auth/devstorage.full_control',
65 'https://www.googleapis.com/auth/compute'],
66 user_agent='bigjob-client/1.0')
67
68 self.credentials = run(flow, storage)
69
70
72 """ Returns security context that needs to be available on the distributed
73 node in order to access this Pilot Data """
74 return self.credentials.to_json()
75
76
78
79 self.__state=State.Running
80 request_dict = {
81 "id":self.bucket_name,
82 "projectId":GS_PROJECT_ID
83 }
84 logger.debug(str(request_dict))
85 try:
86 gs = self.__get_api_client()[0]
87 gs.buckets().insert(body=request_dict).execute()
88 except:
89 logger.debug("Error creating bucket: " + self.bucket_name)
90 pass
91
92
96
97
100
101
104
105
107 gs = self.__get_api_client()[0]
108 o = gs.objects().insert(bucket=self.bucket_name, name=str(du_id)+"/du_info",
109 body={'media': {
110 "contentType":"text/ascii",
111 "data": du_id
112 }
113 }
114 ).execute()
115 logger.debug("Created GS: " + str(o))
116
117
119 logger.debug("Copy DU to Google Storage")
120 du_items = du.list()
121 for i in du_items.keys():
122 try:
123 local_filename=du_items[i]["local"]
124 remote_path = os.path.join(str(du.id), os.path.basename(local_filename))
125 self._put_file(local_filename, remote_path)
126 except:
127 logger.debug("Could not copy file: " + (str(i)))
128
129
132
133
134 - def get_du(self, du, target_url):
135 du_id=du.id
136 logger.debug("Get DU: " + str(du_id))
137 gs = self.__get_api_client()[0]
138 result = gs.objects().list(bucket=self.bucket_name, prefix=du_id).execute()
139
140
141 logger.debug("Result: " + str(result))
142 for i in result["items"]:
143 full_filename = i["name"]
144 self._get_file(full_filename, os.path.join(target_url, os.path.basename(full_filename)))
145
146
148 self.__remove_directory(os.path.join(self.bucket_name, du.id))
149
150
151
152
154 logger.debug("Put file: %s to %s"%(source, target))
155 gs = self.__get_api_client()[0]
156
157
158 o = gs.objects().insert(bucket=self.bucket_name,
159 name=target,
160 media_body=source).execute()
161 logger.debug("Put file result: %s"%str(o))
162
163
165 logger.debug("GET file: %s to %s"%(source, target))
166 gs, http = self.__get_api_client()
167 f = gs.objects().get(bucket=self.bucket_name,
168 object=source).execute()
169 logger.debug("Get file result: %s"%str(f))
170 downloadUrl = f["media"]['link']
171 if downloadUrl:
172 response, content = http.request(downloadUrl)
173 logger.debug("Download file response: %d"%(response.status))
174 with open(target, 'wb') as f:
175 f.write(content)
176
177
178 - def transfer(self, source_url, target_url):
180
183
184
186 result = urlparse.urlparse(target_url)
187 target_path = result.path
188 return target_path
189
190
191
192
194 http = httplib2.Http()
195 http = self.credentials.authorize(http)
196 gs = build("storage", "v1beta1", http=http)
197 return gs, http
198
199
201 bucket_name = service_url.replace("gs://", "")
202 bucket_name = bucket_name.replace("/", "")
203 return bucket_name
204
205
207 exc_type, exc_value, exc_traceback = sys.exc_info()
208 print "*** print_tb:"
209 traceback.print_tb(exc_traceback, limit=1, file=sys.stdout)
210 print "*** print_exception:"
211 traceback.print_exception(exc_type, exc_value, exc_traceback,
212 limit=2, file=sys.stdout)
213
214
215 if __name__ == "__main__":
216 gs = GSFileAdaptor("gs://pilot-data-bucket-1234")
217 gs.initialize_pilotdata()
218 gs._put_file("test-random.exe", "test.exe")
219 gs._get_file("test.txt", "test2.txt")
220 gs.get_du(None, ".")
221