#!/bin/env python
import ZSI, string, sys, getopt, urlparse, os
from ZSI.wstools import WSDLTools
from ZSI.wsdl2python import WriteServiceModule
from ZSI.wsdlInterface import ZSIWsdlAdapter
USAGE = """Usage: ./serverstub -f wsdlfile | -u url [-h] [-e] [-d output dir] [-t typesmodule]
where:
wsdl -> wsdl file to generate callbacks from.
-f -> location of wsdl file in disc
-e -> enable experimental server code generation
-u -> location of wsdl via url
-d -> output directory for files
-h -> prints this message and exits.
-t -> specify a module name to use as the types implementation
"""
ID1 = ' '
ID2 = ID1 * 2
ID3 = ID1 * 3
ID4 = ID1 * 4
class ServerCallbackDescription:
method_prefix = 'soap'
def __init__(self, do_extended=0):
self.imports = ''
self.service = ''
self.classdef = ''
self.initdef = ''
self.location = ''
self.methods = []
self.actions = []
self.do_extended = do_extended
def fromWsdl(self, ws):
wsdl = ws
wsm = ZSI.wsdl2python.WriteServiceModule(ws)
self.service = wsm.get_module_names()[1]
wsm = None
ws = ZSIWsdlAdapter(ws)
self.imports = self.getImports()
for service in ws.getServicesList():
for port in service.getPortList():
# fetch the service location
for e in port.getExtensions():
soapAddress = None
if isinstance(e, ZSI.wsdlInterface.ZSISoapAddressAdapter):
soapAddress = e
if soapAddress:
self.location = soapAddress.getLocation()
# generate methods
for op in port.getBinding().getPortType().getOperationList():
self.generateMethods(op, port, self.do_extended)
if self.do_extended:
self.generateMethods2(op, port)
self.raw_wsdl = wsdl.document.toxml().replace("\"", "\\\"")
self.classdef = self.getClassdef(ws)
self.initdef = self.getInitdef(do_extended=self.do_extended)
if self.do_extended:
self.authdef = self.getAuthdef()
else:
self.authdef = ""
def getImports(self):
if self.do_extended:
i = 'from %s import *' % self.service.replace("services",
"messages")
else:
i = 'from %s import *' % self.service
i += '\nfrom ZSI.ServiceContainer import ServiceSOAPBinding'
return i
def getMethodName(self, method):
return '%s_%s' %(self.method_prefix, method)
def getClassdef(self, ws):
c = '\nclass %s(ServiceSOAPBinding):' \
% ws.getName()
c += '\n%ssoapAction = {' % ID1
for a in self.actions:
c += "\n%s'%s': '%s'," % (ID2, a[0], self.getMethodName(a[1]))
c += '\n%s}' % ID2
c += "\n%s_wsdl = \"\"\"%s\"\"\"" % (ID1, self.raw_wsdl)
return c
def getInitdef(self, do_extended=0):
uri = urlparse.urlparse( self.location )[2]
d = "\n%sdef __init__(self, post='%s', **kw):" % (ID1, uri)
d += '\n%sServiceSOAPBinding.__init__(self, post)' % ID2
if do_extended:
d += '\n%sif kw.has_key(\'impl\'):' % ID2
d += '\n%sself.impl = kw[\'impl\']' % ID3
d += '\n%sif kw.has_key(\'auth_method_name\'):' % ID2
d += '\n%sself.auth_method_name = kw[\'auth_method_name\']' % ID3
return d
def getAuthdef(self):
e = "\n%sdef authorize(self, auth_info, post, action):" % ID1
e += "\n%sif self.auth_method_name and hasattr(self.impl, self.auth_method_name):" % ID2
e += "\n%sreturn getattr(self.impl, self.auth_method_name)(auth_info, post, action)" % ID3
e += "\n%selse:" % ID2
e += "\n%sreturn 1" % ID3
return e
def generateMethods(self, op, port, do_extended=0):
# generate soapactions while we're here
operation = port.getBinding().getOperationDict().get(op.getName())
if operation.getSoapOperation():
action = operation.getSoapOperation().getAction()
if action:
self.actions.append( ( action, op.getName() ) )
# now take care of the method
o = '\n%sdef %s(self, ps):' % (ID1, self.getMethodName(op.getName()))
o += '\n%s# input vals in request object' % ID2
o += '\n%sargs = ps.Parse( %s )' % ( ID2,
op.getInput().getMessage().getName() + 'Wrapper')
o += '\n'
if do_extended:
input_args = op.getInput().getMessage().getPartList()
iargs = ["%s" % x.getName() for x in input_args]
iargs = ", ".join(iargs)
for a in op.getInput().getMessage().getPartList():
if a.getType():
o += '\n%s# %s is a %s' % (ID2, a.getName(),
a.getType().getName())
o += '\n%s%s = args.%s' % (ID2, a.getName(), a.getName())
o += "\n"
invocation = '\n\n%s# Invoke the method' % ID2
invocation += '\n%s%%sself.%s(%s)' % (ID2, op.getName(), iargs)
if op.getOutput().getMessage() is not None:
o += '\n%s# assign return values to response object' % ID2
# JRB CHECK MESSAGE TO SEE IF ITS SIMPLE
response_type = IsSimpleElementDeclaration(op, input=False)
if response_type is False:
o += '\n%sresponse = %s()' \
% ( ID2, op.getOutput().getMessage().getName() \
+ 'Wrapper' )
else:
# can't instantiate a basestring, so by default do a str
if response_type == 'basestring': response_type = 'str'
o += '\n%sclass SimpleTypeWrapper(%s): typecode = %s()' \
% ( ID2,
response_type, op.getOutput().getMessage().getName()
+ 'Wrapper' )
o += '\n\n%s# WARNING specify value eg. SimpleTypeWrapper(1)' % ID2
o += '\n%sresponse = SimpleTypeWrapper()' % ID2
if do_extended:
output_args = op.getOutput().getMessage().getPartList()
oargs = ["%s" % x.getName() for x in output_args]
invoke_return = ""
for ir in ["\n%sresponse.%s = %s" % (ID2, x.getName(), x.getName())
for x in output_args]:
invoke_return += ir
oargs = ", ".join(oargs)
if len(output_args) > 1:
print "Message has more than one return value (Bad Design)."
oargs = "(%s)" % oargs
oargs = "%s = " % oargs
o += invocation % oargs
o += '\n'
o += '\n%s# Assign return values to response' % ID2
o += invoke_return
o += '\n\n%s# Return the response' % ID2
o += '\n%sreturn response' % ID2
else:
if do_extended:
o += invocation % ""
o += '\n'
o += '\n%s# NO output' % ID2
o += '\n%sreturn None' % ID2
self.methods.append(o)
def generateMethods2(self, op, port):
# now take care of the method
input_args = op.getInput().getMessage().getPartList()
iargs = ["%s" % x.getName() for x in input_args]
iargs = ", ".join(iargs)
o = '\n%sdef %s(self, %s):' % (ID1, op.getName(), iargs)
for a in op.getInput().getMessage().getPartList():
if a.getType():
o += "\n%s# %s is a %s" % (ID2, a.getName(),
a.getType().getName())
o += "\n"
if op.getOutput().getMessage():
output_args = op.getOutput().getMessage().getPartList()
oargs = ["%s" % x.getName() for x in output_args]
oargs = ", ".join(oargs)
else:
output_args = None
if output_args:
if len(output_args) > 1:
print "Message has more than one return value (Bad Design)."
oargs = "(%s)" % oargs
oargs = "%s = " % oargs
else:
oargs = ""
o += "\n%s# If we have an implementation object use it" % ID2
o += "\n%sif hasattr(self, 'impl'):" % ID2
o += "\n%s%sself.impl.%s(%s)" % (ID3, oargs, op.getName(), iargs)
o += "\n"
if op.getOutput().getMessage() is not None:
for a in op.getOutput().getMessage().getPartList():
if a.getType():
o += "\n%s# %s is a %s" % (ID2, a.getName(),
a.getType().getName())
o += "\n%sreturn %s" % (ID2, a.getName())
else:
o += "\n%s# There is no return from this method." % ID2
self.methods.append(o)
def getContents(self):
gen_str = string.join([self.imports,
self.classdef,
self.initdef,
self.authdef,
string.join(self.methods, '\n')], '\n') + '\n'
return gen_str
def getStubName(self, do_extended=0):
if do_extended:
return '%s_interface' % self.service[:-len("_services")]
else:
return '%s_server' % self.service
def write(self, fd=sys.stdout):
fd.write( self.getContents() )
def IsSimpleElementDeclaration(op, input=True):
prt = None
if input is True and len( op.getInput().getMessage().getPartList() ) == 1:
prt = op.getInput().getMessage().getPartList()[0]
elif input is False and len( op.getOutput().getMessage().getPartList() ) == 1:
prt = op.getOutput().getMessage().getPartList()[0]
if prt is not None and prt.getElement():
return prt.getElement().isBasicElement()
return False
def doCommandLine():
args_d = {
'fromfile': False,
'fromurl': False,
'extended' : False,
'output_directory' : '.',
'types' : None
}
try:
opts, args = getopt.getopt(sys.argv[1:], 'f:u:d:t:he')
except getopt.GetoptError, e:
print >> sys.stderr, sys.argv[0] + ': ' + str(e)
sys.exit(-1)
if not opts:
print USAGE
sys.exit(-1)
for opt, val in opts:
if opt in [ '-h']:
print USAGE
sys.exit(0)
elif opt in ['-f']:
args_d['wsdl'] = val
args_d['fromfile'] = True
elif opt in ['-u']:
args_d['wsdl'] = val
args_d['fromurl'] = True
elif opt in ['-e']:
args_d['extended'] = True
elif opt in ['-d']:
args_d['output_directory'] = val
elif opt in ['-t']:
args_d['types'] = val
else:
print USAGE
sys.exit(-1)
return args_d
def main():
args_d = doCommandLine()
reader = WSDLTools.WSDLReader()
if args_d['fromfile']:
wsdl = reader.loadFromFile(args_d['wsdl'])
elif args_d['fromurl']:
wsdl = reader.loadFromURL(args_d['wsdl'])
ss = ServerCallbackDescription(do_extended = args_d['extended'])
ss.fromWsdl(wsdl)
fd = open(os.path.join(args_d['output_directory'],
ss.getStubName(args_d['extended'])+'.py'),
'w+' )
ss.write(fd)
fd.close()
if __name__ == '__main__':
main()
syntax highlighted by Code2HTML, v. 0.9.1