diff options
author | garnaat <mitch@garnaat.com> | 2011-01-30 18:21:45 -0500 |
---|---|---|
committer | garnaat <mitch@garnaat.com> | 2011-01-30 18:21:45 -0500 |
commit | 2e325779e34bae1f8b3acf2c252a88ea167c56fe (patch) | |
tree | ef4dcab8f2d14e0a481b9a871e86dfe64b567f6a /boto/iam | |
parent | 9af879e63bd043b296f02ad134db3351c065bf10 (diff) | |
download | boto-2e325779e34bae1f8b3acf2c252a88ea167c56fe.tar.gz |
moving connection classes to connection.py files for consistency.
Diffstat (limited to 'boto/iam')
-rw-r--r-- | boto/iam/__init__.py | 930 | ||||
-rw-r--r-- | boto/iam/connection.py | 952 | ||||
-rw-r--r-- | boto/iam/response.py | 146 |
3 files changed, 957 insertions, 1071 deletions
diff --git a/boto/iam/__init__.py b/boto/iam/__init__.py index aaf89839..498d736b 100644 --- a/boto/iam/__init__.py +++ b/boto/iam/__init__.py @@ -1,4 +1,5 @@ -# Copyright (c) 2010 Mitch Garnaat http://garnaat.org/ +# Copyright (c) 2010-2011 Mitch Garnaat http://garnaat.org/ +# Copyright (c) 2010-2011, Eucalyptus Systems, Inc. # # Permission is hereby granted, free of charge, to any person obtaining a # copy of this software and associated documentation files (the @@ -19,929 +20,8 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. -import boto -import boto.iam.response -from boto.connection import AWSQueryConnection - -#boto.set_stream_logger('iam') - -class IAMConnection(AWSQueryConnection): - - APIVersion = '2010-05-08' - - def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, - is_secure=True, port=None, proxy=None, proxy_port=None, - proxy_user=None, proxy_pass=None, host='iam.amazonaws.com', - debug=0, https_connection_factory=None, path='/'): - AWSQueryConnection.__init__(self, aws_access_key_id, aws_secret_access_key, - is_secure, port, proxy, proxy_port, proxy_user, proxy_pass, - host, debug, https_connection_factory, path) - - def _required_auth_capability(self): - return ['iam'] - - def get_response(self, action, params, path='/', parent=None, - verb='GET', list_marker='Set'): - """ - Utility method to handle calls to IAM and parsing of responses. - """ - if not parent: - parent = self - response = self.make_request(action, params, path, verb) - body = response.read() - boto.log.debug(body) - if response.status == 200: - e = boto.iam.response.Element(list_marker=list_marker) - h = boto.iam.response.XmlHandler(e, parent) - h.parse(body) - return e - else: - boto.log.error('%s %s' % (response.status, response.reason)) - boto.log.error('%s' % body) - raise self.ResponseError(response.status, response.reason, body) - - # - # Group methods - # - - def get_all_groups(self, path_prefix='/', marker=None, max_items=None): - """ - List the groups that have the specified path prefix. - - :type path_prefix: string - :param path_prefix: If provided, only groups whose paths match - the provided prefix will be returned. - - :type marker: string - :param marker: Use this only when paginating results and only in - follow-up request after you've received a response - where the results are truncated. Set this to the - value of the Marker element in the response you - just received. - - :type max_items: int - :param max_items: Use this only when paginating results to indicate - the maximum number of groups you want in the - response. - """ - params = {} - if path_prefix: - params['PathPrefix'] = path_prefix - if marker: - params['Marker'] = marker - if max_items: - params['MaxItems'] = max_items - return self.get_response('ListGroups', params, list_marker='Groups') - - def get_group(self, group_name, marker=None, max_items=None): - """ - Return a list of users that are in the specified group. - - :type group_name: string - :param group_name: The name of the group whose information should - be returned. - :type marker: string - :param marker: Use this only when paginating results and only in - follow-up request after you've received a response - where the results are truncated. Set this to the - value of the Marker element in the response you - just received. - - :type max_items: int - :param max_items: Use this only when paginating results to indicate - the maximum number of groups you want in the - response. - """ - params = {'GroupName' : group_name} - if marker: - params['Marker'] = marker - if max_items: - params['MaxItems'] = max_items - return self.get_response('GetGroup', params, list_marker='Users') - - def create_group(self, group_name, path='/'): - """ - Create a group. - - :type group_name: string - :param group_name: The name of the new group - - :type path: string - :param path: The path to the group (Optional). Defaults to /. - - """ - params = {'GroupName' : group_name, - 'Path' : path} - return self.get_response('CreateGroup', params) - - def delete_group(self, group_name): - """ - Delete a group. The group must not contain any Users or - have any attached policies - - :type group_name: string - :param group_name: The name of the group to delete. - - """ - params = {'GroupName' : group_name} - return self.get_response('DeleteGroup', params) - - def update_group(self, group_name, new_group_name=None, new_path=None): - """ - Updates name and/or path of the specified group. - - :type group_name: string - :param group_name: The name of the new group - - :type new_group_name: string - :param new_group_name: If provided, the name of the group will be - changed to this name. - - :type new_path: string - :param new_path: If provided, the path of the group will be - changed to this path. - - """ - params = {'GroupName' : group_name} - if new_group_name: - params['NewGroupName'] = new_group_name - if new_path: - params['NewPath'] = new_path - return self.get_response('UpdateGroup', params) - - def add_user_to_group(self, group_name, user_name): - """ - Add a user to a group - - :type group_name: string - :param group_name: The name of the new group - - :type user_name: string - :param user_name: The to be added to the group. - - """ - params = {'GroupName' : group_name, - 'UserName' : user_name} - return self.get_response('AddUserToGroup', params) - - def remove_user_from_group(self, group_name, user_name): - """ - Remove a user from a group. - - :type group_name: string - :param group_name: The name of the new group - - :type user_name: string - :param user_name: The user to remove from the group. - - """ - params = {'GroupName' : group_name, - 'UserName' : user_name} - return self.get_response('RemoveUserFromGroup', params) - - def put_group_policy(self, group_name, policy_name, policy_json): - """ - Adds or updates the specified policy document for the specified group. - - :type group_name: string - :param group_name: The name of the group the policy is associated with. - - :type policy_name: string - :param policy_name: The policy document to get. - - :type policy_json: string - :param policy_json: The policy document. - - """ - params = {'GroupName' : group_name, - 'PolicyName' : policy_name, - 'PolicyDocument' : policy_json} - return self.get_response('PutGroupPolicy', params, verb='POST') - - def get_all_group_policies(self, group_name, marker=None, max_items=None): - """ - List the names of the policies associated with the specified group. - - :type group_name: string - :param group_name: The name of the group the policy is associated with. - - :type marker: string - :param marker: Use this only when paginating results and only in - follow-up request after you've received a response - where the results are truncated. Set this to the - value of the Marker element in the response you - just received. - - :type max_items: int - :param max_items: Use this only when paginating results to indicate - the maximum number of groups you want in the - response. - """ - params = {'GroupName' : group_name} - if marker: - params['Marker'] = marker - if max_items: - params['MaxItems'] = max_items - return self.get_response('ListGroupPolicies', params, - list_marker='PolicyNames') - - def get_group_policy(self, group_name, policy_name): - """ - Retrieves the specified policy document for the specified group. - - :type group_name: string - :param group_name: The name of the group the policy is associated with. - - :type policy_name: string - :param policy_name: The policy document to get. - - """ - params = {'GroupName' : group_name, - 'PolicyName' : policy_name} - return self.get_response('GetGroupPolicy', params, verb='POST') - - def delete_group_policy(self, group_name, policy_name): - """ - Deletes the specified policy document for the specified group. - - :type group_name: string - :param group_name: The name of the group the policy is associated with. - - :type policy_name: string - :param policy_name: The policy document to delete. - - """ - params = {'GroupName' : group_name, - 'PolicyName' : policy_name} - return self.get_response('DeleteGroupPolicy', params, verb='POST') - - def get_all_users(self, path_prefix='/', marker=None, max_items=None): - """ - List the users that have the specified path prefix. - - :type path_prefix: string - :param path_prefix: If provided, only users whose paths match - the provided prefix will be returned. - - :type marker: string - :param marker: Use this only when paginating results and only in - follow-up request after you've received a response - where the results are truncated. Set this to the - value of the Marker element in the response you - just received. - - :type max_items: int - :param max_items: Use this only when paginating results to indicate - the maximum number of groups you want in the - response. - """ - params = {'PathPrefix' : path_prefix} - if marker: - params['Marker'] = marker - if max_items: - params['MaxItems'] = max_items - return self.get_response('ListUsers', params, list_marker='Users') - - # - # User methods - # - - def create_user(self, user_name, path='/'): - """ - Create a user. - - :type user_name: string - :param user_name: The name of the new user - - :type path: string - :param path: The path in which the user will be created. - Defaults to /. - - """ - params = {'UserName' : user_name, - 'Path' : path} - return self.get_response('CreateUser', params) - - def delete_user(self, user_name): - """ - Delete a user including the user's path, GUID and ARN. - - If the user_name is not specified, the user_name is determined - implicitly based on the AWS Access Key ID used to sign the request. - - :type user_name: string - :param user_name: The name of the user to delete. - - """ - params = {'UserName' : user_name} - return self.get_response('DeleteUser', params) - - def get_user(self, user_name=None): - """ - Retrieve information about the specified user. - - If the user_name is not specified, the user_name is determined - implicitly based on the AWS Access Key ID used to sign the request. - - :type user_name: string - :param user_name: The name of the user to delete. - If not specified, defaults to user making - request. - - """ - params = {} - if user_name: - params['UserName'] = user_name - return self.get_response('GetUser', params) - - def update_user(self, user_name, new_user_name=None, new_path=None): - """ - Updates name and/or path of the specified user. - - :type user_name: string - :param user_name: The name of the user - - :type new_user_name: string - :param new_user_name: If provided, the username of the user will be - changed to this username. - - :type new_path: string - :param new_path: If provided, the path of the user will be - changed to this path. - - """ - params = {'UserName' : user_name} - if new_user_name: - params['NewUserName'] = new_user_name - if new_path: - params['NewPath'] = new_path - return self.get_response('UpdateUser', params) - - def get_all_user_policies(self, user_name, marker=None, max_items=None): - """ - List the names of the policies associated with the specified user. - - :type user_name: string - :param user_name: The name of the user the policy is associated with. - - :type marker: string - :param marker: Use this only when paginating results and only in - follow-up request after you've received a response - where the results are truncated. Set this to the - value of the Marker element in the response you - just received. - - :type max_items: int - :param max_items: Use this only when paginating results to indicate - the maximum number of groups you want in the - response. - """ - params = {'UserName' : user_name} - if marker: - params['Marker'] = marker - if max_items: - params['MaxItems'] = max_items - return self.get_response('ListUserPolicies', params, - list_marker='PolicyNames') - - def put_user_policy(self, user_name, policy_name, policy_json): - """ - Adds or updates the specified policy document for the specified user. - - :type user_name: string - :param user_name: The name of the user the policy is associated with. - - :type policy_name: string - :param policy_name: The policy document to get. - - :type policy_json: string - :param policy_json: The policy document. - - """ - params = {'UserName' : user_name, - 'PolicyName' : policy_name, - 'PolicyDocument' : policy_json} - return self.get_response('PutUserPolicy', params, verb='POST') - - def get_user_policy(self, user_name, policy_name): - """ - Retrieves the specified policy document for the specified user. - - :type user_name: string - :param user_name: The name of the user the policy is associated with. - - :type policy_name: string - :param policy_name: The policy document to get. - - """ - params = {'UserName' : user_name, - 'PolicyName' : policy_name} - return self.get_response('GetUserPolicy', params, verb='POST') - - def delete_user_policy(self, user_name, policy_name): - """ - Deletes the specified policy document for the specified user. - - :type user_name: string - :param user_name: The name of the user the policy is associated with. - - :type policy_name: string - :param policy_name: The policy document to delete. - - """ - params = {'UserName' : user_name, - 'PolicyName' : policy_name} - return self.get_response('DeleteUserPolicy', params, verb='POST') - - def get_groups_for_user(self, user_name, marker=None, max_items=None): - """ - List the groups that a specified user belongs to. - - :type user_name: string - :param user_name: The name of the user to list groups for. - - :type marker: string - :param marker: Use this only when paginating results and only in - follow-up request after you've received a response - where the results are truncated. Set this to the - value of the Marker element in the response you - just received. - - :type max_items: int - :param max_items: Use this only when paginating results to indicate - the maximum number of groups you want in the - response. - """ - params = {'UserName' : user_name} - if marker: - params['Marker'] = marker - if max_items: - params['MaxItems'] = max_items - return self.get_response('ListGroupsForUser', params, list_marker='Groups') - - # - # Access Keys - # - - def get_all_access_keys(self, user_name, marker=None, max_items=None): - """ - Get all access keys associated with an account. - - :type user_name: string - :param user_name: The username of the new user - - :type marker: string - :param marker: Use this only when paginating results and only in - follow-up request after you've received a response - where the results are truncated. Set this to the - value of the Marker element in the response you - just received. - - :type max_items: int - :param max_items: Use this only when paginating results to indicate - the maximum number of groups you want in the - response. - """ - params = {'UserName' : user_name} - if marker: - params['Marker'] = marker - if max_items: - params['MaxItems'] = max_items - return self.get_response('ListAccessKeys', params, - list_marker='AccessKeyMetadata') - - def create_access_key(self, user_name=None): - """ - Create a new AWS Secret Access Key and corresponding AWS Access Key ID - for the specified user. The default status for new keys is Active - - If the user_name is not specified, the user_name is determined - implicitly based on the AWS Access Key ID used to sign the request. - - :type user_name: string - :param user_name: The username of the new user - - """ - params = {'UserName' : user_name} - return self.get_response('CreateAccessKey', params) - - def update_access_key(self, access_key_id, status, user_name=None): - """ - Changes the status of the specified access key from Active to Inactive - or vice versa. This action can be used to disable a user's key as - part of a key rotation workflow. - - If the user_name is not specified, the user_name is determined - implicitly based on the AWS Access Key ID used to sign the request. - - :type access_key_id: string - :param access_key_id: The ID of the access key. - - :type status: string - :param status: Either Active or Inactive. - - :type user_name: string - :param user_name: The username of user (optional). - - """ - params = {'AccessKeyId' : access_key_id, - 'Status' : status} - if user_name: - params['UserName'] = user_name - return self.get_response('UpdateAccessKey', params) - - def delete_access_key(self, access_key_id, user_name=None): - """ - Delete an access key associated with a user. - - If the user_name is not specified, it is determined implicitly based - on the AWS Access Key ID used to sign the request. - - :type access_key_id: string - :param access_key_id: The ID of the access key to be deleted. - - :type user_name: string - :param user_name: The username of the new user - - """ - params = {'AccessKeyId' : access_key_id} - if user_name: - params['UserName'] = user_name - return self.get_response('DeleteAccessKey', params) - - # - # Signing Certificates - # - - def get_all_signing_certs(self, marker=None, max_items=None, - user_name=None): - """ - Get all signing certificates associated with an account. - - If the user_name is not specified, it is determined implicitly based - on the AWS Access Key ID used to sign the request. - - :type marker: string - :param marker: Use this only when paginating results and only in - follow-up request after you've received a response - where the results are truncated. Set this to the - value of the Marker element in the response you - just received. - - :type max_items: int - :param max_items: Use this only when paginating results to indicate - the maximum number of groups you want in the - response. - - :type user_name: string - :param user_name: The username of the user - - """ - params = {} - if marker: - params['Marker'] = marker - if max_items: - params['MaxItems'] = max_items - if user_name: - params['UserName'] = user_name - return self.get_response('ListSigningCertificates', - params, list_marker='Certificates') - - def update_signing_cert(self, cert_id, status, user_name=None): - """ - Change the status of the specified signing certificate from - Active to Inactive or vice versa. - - If the user_name is not specified, it is determined implicitly based - on the AWS Access Key ID used to sign the request. - - :type cert_id: string - :param cert_id: The ID of the signing certificate - - :type status: string - :param status: Either Active or Inactive. - - :type user_name: string - :param user_name: The username of the user - """ - params = {'CertificateId' : cert_id, - 'Status' : status} - if user_name: - params['UserName'] = user_name - return self.get_response('UpdateSigningCertificate', params) - - def upload_signing_cert(self, cert_body, user_name=None): - """ - Uploads an X.509 signing certificate and associates it with - the specified user. - - If the user_name is not specified, it is determined implicitly based - on the AWS Access Key ID used to sign the request. - - :type cert_body: string - :param cert_body: The body of the signing certificate. - - :type user_name: string - :param user_name: The username of the new user - - """ - params = {'CertificateBody' : cert_body} - if user_name: - params['UserName'] = user_name - return self.get_response('UploadSigningCertificate', params, - verb='POST') - - def delete_signing_cert(self, cert_id, user_name=None): - """ - Delete a signing certificate associated with a user. - - If the user_name is not specified, it is determined implicitly based - on the AWS Access Key ID used to sign the request. - - :type user_name: string - :param user_name: The username of the new user - - :type cert_id: string - :param cert_id: The ID of the certificate. - - """ - params = {'CertificateId' : cert_id} - if user_name: - params['UserName'] = user_name - return self.get_response('DeleteSigningCertificate', params) - - # - # Server Certificates - # - - def get_all_server_certs(self, path_prefix='/', - marker=None, max_items=None): - """ - Lists the server certificates that have the specified path prefix. - If none exist, the action returns an empty list. - - :type path_prefix: string - :param path_prefix: If provided, only certificates whose paths match - the provided prefix will be returned. - - :type marker: string - :param marker: Use this only when paginating results and only in - follow-up request after you've received a response - where the results are truncated. Set this to the - value of the Marker element in the response you - just received. - - :type max_items: int - :param max_items: Use this only when paginating results to indicate - the maximum number of groups you want in the - response. - - """ - params = {} - if path_prefix: - params['PathPrefix'] = path_prefix - if marker: - params['Marker'] = marker - if max_items: - params['MaxItems'] = max_items - return self.get_response('ListServerCertificates', - params, - list_marker='ServerCertificateMetadataList') - - def update_server_cert(self, cert_name, new_cert_name=None, - new_path=None): - """ - Updates the name and/or the path of the specified server certificate. - - :type cert_name: string - :param cert_name: The name of the server certificate that you want - to update. - - :type new_cert_name: string - :param new_cert_name: The new name for the server certificate. - Include this only if you are updating the - server certificate's name. - - :type new_path: string - :param new_path: If provided, the path of the certificate will be - changed to this path. - """ - params = {'ServerCertificateName' : cert_name} - if new_cert_name: - params['NewServerCertificateName'] = new_cert_name - if new_path: - params['NewPath'] = new_path - return self.get_response('UpdateServerCertificate', params) - - def upload_server_cert(self, cert_name, cert_body, private_key, - cert_chain=None, path=None): - """ - Uploads a server certificate entity for the AWS Account. - The server certificate entity includes a public key certificate, - a private key, and an optional certificate chain, which should - all be PEM-encoded. - - :type cert_name: string - :param cert_name: The name for the server certificate. Do not - include the path in this value. - - :type cert_body: string - :param cert_body: The contents of the public key certificate - in PEM-encoded format. - - :type private_key: string - :param private_key: The contents of the private key in - PEM-encoded format. - - :type cert_chain: string - :param cert_chain: The contents of the certificate chain. This - is typically a concatenation of the PEM-encoded - public key certificates of the chain. - - :type path: string - :param path: The path for the server certificate. - - """ - params = {'ServerCertificateName' : cert_name, - 'CertificateBody' : cert_body, - 'PrivateKey' : private_key} - if cert_chain: - params['CertificateChain'] = cert_chain - if path: - params['Path'] = path - return self.get_response('UploadServerCertificate', params, - verb='POST') - - def get_server_certificate(self, cert_name): - """ - Retrieves information about the specified server certificate. - - :type cert_name: string - :param cert_name: The name of the server certificate you want - to retrieve information about. - - """ - params = {'ServerCertificateName' : cert_name} - return self.get_response('GetServerCertificate', params) - - def delete_server_cert(self, cert_name): - """ - Delete the specified server certificate. - - :type cert_name: string - :param cert_name: The name of the server certificate you want - to delete. - - """ - params = {'ServerCertificateName' : cert_name} - return self.get_response('DeleteServerCertificate', params) - - # - # MFA Devices - # - - def get_all_mfa_devices(self, user_name, marker=None, max_items=None): - """ - Get all MFA devices associated with an account. - - :type user_name: string - :param user_name: The username of the user - - :type marker: string - :param marker: Use this only when paginating results and only in - follow-up request after you've received a response - where the results are truncated. Set this to the - value of the Marker element in the response you - just received. - - :type max_items: int - :param max_items: Use this only when paginating results to indicate - the maximum number of groups you want in the - response. - - """ - params = {'UserName' : user_name} - if marker: - params['Marker'] = marker - if max_items: - params['MaxItems'] = max_items - return self.get_response('ListMFADevices', - params, list_marker='MFADevices') - - def enable_mfa_device(self, user_name, serial_number, - auth_code_1, auth_code_2): - """ - Enables the specified MFA device and associates it with the - specified user. - - :type user_name: string - :param user_name: The username of the user - - :type serial_number: string - :param seriasl_number: The serial number which uniquely identifies - the MFA device. - - :type auth_code_1: string - :param auth_code_1: An authentication code emitted by the device. - - :type auth_code_2: string - :param auth_code_2: A subsequent authentication code emitted - by the device. - - """ - params = {'UserName' : user_name, - 'SerialNumber' : serial_number, - 'AuthenticationCode1' : auth_code_1, - 'AuthenticationCode2' : auth_code_2} - return self.get_response('EnableMFADevice', params) - - def deactivate_mfa_device(self, user_name, serial_number): - """ - Deactivates the specified MFA device and removes it from - association with the user. - - :type user_name: string - :param user_name: The username of the user - - :type serial_number: string - :param seriasl_number: The serial number which uniquely identifies - the MFA device. - - """ - params = {'UserName' : user_name, - 'SerialNumber' : serial_number} - return self.get_response('DeactivateMFADevice', params) - - def resync_mfa_device(self, user_name, serial_number, - auth_code_1, auth_code_2): - """ - Syncronizes the specified MFA device with the AWS servers. - - :type user_name: string - :param user_name: The username of the user - - :type serial_number: string - :param seriasl_number: The serial number which uniquely identifies - the MFA device. - - :type auth_code_1: string - :param auth_code_1: An authentication code emitted by the device. - - :type auth_code_2: string - :param auth_code_2: A subsequent authentication code emitted - by the device. - - """ - params = {'UserName' : user_name, - 'SerialNumber' : serial_number, - 'AuthenticationCode1' : auth_code_1, - 'AuthenticationCode2' : auth_code_2} - return self.get_response('ResyncMFADevice', params) - - # - # Login Profiles - # - - def create_login_profile(self, user_name, password): - """ - Creates a login profile for the specified user, give the user the - ability to access AWS services and the AWS Management Console. - - :type user_name: string - :param user_name: The name of the new user - - :type password: string - :param password: The new password for the user - - """ - params = {'UserName' : user_name, - 'Password' : password} - return self.get_response('CreateLoginProfile', params) - - def delete_login_profile(self, user_name): - """ - Deletes the login profile associated with the specified user. - - :type user_name: string - :param user_name: The name of the user to delete. - - """ - params = {'UserName' : user_name} - return self.get_response('DeleteLoginProfile', params) - - def update_login_profile(self, user_name, password): - """ - Resets the password associated with the user's login profile. - - :type user_name: string - :param user_name: The name of the user - - :type password: string - :param password: The new password for the user - - """ - params = {'UserName' : user_name, - 'Password' : password} - return self.get_response('UpdateLoginProfile', params) +# this is here for backward compatibility +# originally, the IAMConnection class was defined here +from connection import IAMConnection diff --git a/boto/iam/connection.py b/boto/iam/connection.py new file mode 100644 index 00000000..ad759712 --- /dev/null +++ b/boto/iam/connection.py @@ -0,0 +1,952 @@ +# Copyright (c) 2010-2011 Mitch Garnaat http://garnaat.org/ +# Copyright (c) 2010-2011, Eucalyptus Systems, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, dis- +# tribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the fol- +# lowing conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- +# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +import boto +import boto.jsonresponse +from boto.connection import AWSQueryConnection + +#boto.set_stream_logger('iam') + +class IAMConnection(AWSQueryConnection): + + APIVersion = '2010-05-08' + + def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, + is_secure=True, port=None, proxy=None, proxy_port=None, + proxy_user=None, proxy_pass=None, host='iam.amazonaws.com', + debug=0, https_connection_factory=None, path='/'): + AWSQueryConnection.__init__(self, aws_access_key_id, + aws_secret_access_key, + is_secure, port, proxy, + proxy_port, proxy_user, proxy_pass, + host, debug, https_connection_factory, path) + + def _required_auth_capability(self): + return ['iam'] + + def get_response(self, action, params, path='/', parent=None, + verb='GET', list_marker='Set'): + """ + Utility method to handle calls to IAM and parsing of responses. + """ + if not parent: + parent = self + response = self.make_request(action, params, path, verb) + body = response.read() + boto.log.debug(body) + if response.status == 200: + e = boto.jsonresponse.Element(list_marker=list_marker) + h = boto.jsonresponse.XmlHandler(e, parent) + h.parse(body) + return e + else: + boto.log.error('%s %s' % (response.status, response.reason)) + boto.log.error('%s' % body) + raise self.ResponseError(response.status, response.reason, body) + + # + # Group methods + # + + def get_all_groups(self, path_prefix='/', marker=None, max_items=None): + """ + List the groups that have the specified path prefix. + + :type path_prefix: string + :param path_prefix: If provided, only groups whose paths match + the provided prefix will be returned. + + :type marker: string + :param marker: Use this only when paginating results and only in + follow-up request after you've received a response + where the results are truncated. Set this to the + value of the Marker element in the response you + just received. + + :type max_items: int + :param max_items: Use this only when paginating results to indicate + the maximum number of groups you want in the + response. + """ + params = {} + if path_prefix: + params['PathPrefix'] = path_prefix + if marker: + params['Marker'] = marker + if max_items: + params['MaxItems'] = max_items + return self.get_response('ListGroups', params, + list_marker='Groups') + + def get_group(self, group_name, marker=None, max_items=None): + """ + Return a list of users that are in the specified group. + + :type group_name: string + :param group_name: The name of the group whose information should + be returned. + :type marker: string + :param marker: Use this only when paginating results and only in + follow-up request after you've received a response + where the results are truncated. Set this to the + value of the Marker element in the response you + just received. + + :type max_items: int + :param max_items: Use this only when paginating results to indicate + the maximum number of groups you want in the + response. + """ + params = {'GroupName' : group_name} + if marker: + params['Marker'] = marker + if max_items: + params['MaxItems'] = max_items + return self.get_response('GetGroup', params, list_marker='Users') + + def create_group(self, group_name, path='/'): + """ + Create a group. + + :type group_name: string + :param group_name: The name of the new group + + :type path: string + :param path: The path to the group (Optional). Defaults to /. + + """ + params = {'GroupName' : group_name, + 'Path' : path} + return self.get_response('CreateGroup', params) + + def delete_group(self, group_name): + """ + Delete a group. The group must not contain any Users or + have any attached policies + + :type group_name: string + :param group_name: The name of the group to delete. + + """ + params = {'GroupName' : group_name} + return self.get_response('DeleteGroup', params) + + def update_group(self, group_name, new_group_name=None, new_path=None): + """ + Updates name and/or path of the specified group. + + :type group_name: string + :param group_name: The name of the new group + + :type new_group_name: string + :param new_group_name: If provided, the name of the group will be + changed to this name. + + :type new_path: string + :param new_path: If provided, the path of the group will be + changed to this path. + + """ + params = {'GroupName' : group_name} + if new_group_name: + params['NewGroupName'] = new_group_name + if new_path: + params['NewPath'] = new_path + return self.get_response('UpdateGroup', params) + + def add_user_to_group(self, group_name, user_name): + """ + Add a user to a group + + :type group_name: string + :param group_name: The name of the new group + + :type user_name: string + :param user_name: The to be added to the group. + + """ + params = {'GroupName' : group_name, + 'UserName' : user_name} + return self.get_response('AddUserToGroup', params) + + def remove_user_from_group(self, group_name, user_name): + """ + Remove a user from a group. + + :type group_name: string + :param group_name: The name of the new group + + :type user_name: string + :param user_name: The user to remove from the group. + + """ + params = {'GroupName' : group_name, + 'UserName' : user_name} + return self.get_response('RemoveUserFromGroup', params) + + def put_group_policy(self, group_name, policy_name, policy_json): + """ + Adds or updates the specified policy document for the specified group. + + :type group_name: string + :param group_name: The name of the group the policy is associated with. + + :type policy_name: string + :param policy_name: The policy document to get. + + :type policy_json: string + :param policy_json: The policy document. + + """ + params = {'GroupName' : group_name, + 'PolicyName' : policy_name, + 'PolicyDocument' : policy_json} + return self.get_response('PutGroupPolicy', params, verb='POST') + + def get_all_group_policies(self, group_name, marker=None, max_items=None): + """ + List the names of the policies associated with the specified group. + + :type group_name: string + :param group_name: The name of the group the policy is associated with. + + :type marker: string + :param marker: Use this only when paginating results and only in + follow-up request after you've received a response + where the results are truncated. Set this to the + value of the Marker element in the response you + just received. + + :type max_items: int + :param max_items: Use this only when paginating results to indicate + the maximum number of groups you want in the + response. + """ + params = {'GroupName' : group_name} + if marker: + params['Marker'] = marker + if max_items: + params['MaxItems'] = max_items + return self.get_response('ListGroupPolicies', params, + list_marker='PolicyNames') + + def get_group_policy(self, group_name, policy_name): + """ + Retrieves the specified policy document for the specified group. + + :type group_name: string + :param group_name: The name of the group the policy is associated with. + + :type policy_name: string + :param policy_name: The policy document to get. + + """ + params = {'GroupName' : group_name, + 'PolicyName' : policy_name} + return self.get_response('GetGroupPolicy', params, verb='POST') + + def delete_group_policy(self, group_name, policy_name): + """ + Deletes the specified policy document for the specified group. + + :type group_name: string + :param group_name: The name of the group the policy is associated with. + + :type policy_name: string + :param policy_name: The policy document to delete. + + """ + params = {'GroupName' : group_name, + 'PolicyName' : policy_name} + return self.get_response('DeleteGroupPolicy', params, verb='POST') + + def get_all_users(self, path_prefix='/', marker=None, max_items=None): + """ + List the users that have the specified path prefix. + + :type path_prefix: string + :param path_prefix: If provided, only users whose paths match + the provided prefix will be returned. + + :type marker: string + :param marker: Use this only when paginating results and only in + follow-up request after you've received a response + where the results are truncated. Set this to the + value of the Marker element in the response you + just received. + + :type max_items: int + :param max_items: Use this only when paginating results to indicate + the maximum number of groups you want in the + response. + """ + params = {'PathPrefix' : path_prefix} + if marker: + params['Marker'] = marker + if max_items: + params['MaxItems'] = max_items + return self.get_response('ListUsers', params, list_marker='Users') + + # + # User methods + # + + def create_user(self, user_name, path='/'): + """ + Create a user. + + :type user_name: string + :param user_name: The name of the new user + + :type path: string + :param path: The path in which the user will be created. + Defaults to /. + + """ + params = {'UserName' : user_name, + 'Path' : path} + return self.get_response('CreateUser', params) + + def delete_user(self, user_name): + """ + Delete a user including the user's path, GUID and ARN. + + If the user_name is not specified, the user_name is determined + implicitly based on the AWS Access Key ID used to sign the request. + + :type user_name: string + :param user_name: The name of the user to delete. + + """ + params = {'UserName' : user_name} + return self.get_response('DeleteUser', params) + + def get_user(self, user_name=None): + """ + Retrieve information about the specified user. + + If the user_name is not specified, the user_name is determined + implicitly based on the AWS Access Key ID used to sign the request. + + :type user_name: string + :param user_name: The name of the user to delete. + If not specified, defaults to user making + request. + + """ + params = {} + if user_name: + params['UserName'] = user_name + return self.get_response('GetUser', params) + + def update_user(self, user_name, new_user_name=None, new_path=None): + """ + Updates name and/or path of the specified user. + + :type user_name: string + :param user_name: The name of the user + + :type new_user_name: string + :param new_user_name: If provided, the username of the user will be + changed to this username. + + :type new_path: string + :param new_path: If provided, the path of the user will be + changed to this path. + + """ + params = {'UserName' : user_name} + if new_user_name: + params['NewUserName'] = new_user_name + if new_path: + params['NewPath'] = new_path + return self.get_response('UpdateUser', params) + + def get_all_user_policies(self, user_name, marker=None, max_items=None): + """ + List the names of the policies associated with the specified user. + + :type user_name: string + :param user_name: The name of the user the policy is associated with. + + :type marker: string + :param marker: Use this only when paginating results and only in + follow-up request after you've received a response + where the results are truncated. Set this to the + value of the Marker element in the response you + just received. + + :type max_items: int + :param max_items: Use this only when paginating results to indicate + the maximum number of groups you want in the + response. + """ + params = {'UserName' : user_name} + if marker: + params['Marker'] = marker + if max_items: + params['MaxItems'] = max_items + return self.get_response('ListUserPolicies', params, + list_marker='PolicyNames') + + def put_user_policy(self, user_name, policy_name, policy_json): + """ + Adds or updates the specified policy document for the specified user. + + :type user_name: string + :param user_name: The name of the user the policy is associated with. + + :type policy_name: string + :param policy_name: The policy document to get. + + :type policy_json: string + :param policy_json: The policy document. + + """ + params = {'UserName' : user_name, + 'PolicyName' : policy_name, + 'PolicyDocument' : policy_json} + return self.get_response('PutUserPolicy', params, verb='POST') + + def get_user_policy(self, user_name, policy_name): + """ + Retrieves the specified policy document for the specified user. + + :type user_name: string + :param user_name: The name of the user the policy is associated with. + + :type policy_name: string + :param policy_name: The policy document to get. + + """ + params = {'UserName' : user_name, + 'PolicyName' : policy_name} + return self.get_response('GetUserPolicy', params, verb='POST') + + def delete_user_policy(self, user_name, policy_name): + """ + Deletes the specified policy document for the specified user. + + :type user_name: string + :param user_name: The name of the user the policy is associated with. + + :type policy_name: string + :param policy_name: The policy document to delete. + + """ + params = {'UserName' : user_name, + 'PolicyName' : policy_name} + return self.get_response('DeleteUserPolicy', params, verb='POST') + + def get_groups_for_user(self, user_name, marker=None, max_items=None): + """ + List the groups that a specified user belongs to. + + :type user_name: string + :param user_name: The name of the user to list groups for. + + :type marker: string + :param marker: Use this only when paginating results and only in + follow-up request after you've received a response + where the results are truncated. Set this to the + value of the Marker element in the response you + just received. + + :type max_items: int + :param max_items: Use this only when paginating results to indicate + the maximum number of groups you want in the + response. + """ + params = {'UserName' : user_name} + if marker: + params['Marker'] = marker + if max_items: + params['MaxItems'] = max_items + return self.get_response('ListGroupsForUser', params, + list_marker='Groups') + + # + # Access Keys + # + + def get_all_access_keys(self, user_name, marker=None, max_items=None): + """ + Get all access keys associated with an account. + + :type user_name: string + :param user_name: The username of the new user + + :type marker: string + :param marker: Use this only when paginating results and only in + follow-up request after you've received a response + where the results are truncated. Set this to the + value of the Marker element in the response you + just received. + + :type max_items: int + :param max_items: Use this only when paginating results to indicate + the maximum number of groups you want in the + response. + """ + params = {'UserName' : user_name} + if marker: + params['Marker'] = marker + if max_items: + params['MaxItems'] = max_items + return self.get_response('ListAccessKeys', params, + list_marker='AccessKeyMetadata') + + def create_access_key(self, user_name=None): + """ + Create a new AWS Secret Access Key and corresponding AWS Access Key ID + for the specified user. The default status for new keys is Active + + If the user_name is not specified, the user_name is determined + implicitly based on the AWS Access Key ID used to sign the request. + + :type user_name: string + :param user_name: The username of the new user + + """ + params = {'UserName' : user_name} + return self.get_response('CreateAccessKey', params) + + def update_access_key(self, access_key_id, status, user_name=None): + """ + Changes the status of the specified access key from Active to Inactive + or vice versa. This action can be used to disable a user's key as + part of a key rotation workflow. + + If the user_name is not specified, the user_name is determined + implicitly based on the AWS Access Key ID used to sign the request. + + :type access_key_id: string + :param access_key_id: The ID of the access key. + + :type status: string + :param status: Either Active or Inactive. + + :type user_name: string + :param user_name: The username of user (optional). + + """ + params = {'AccessKeyId' : access_key_id, + 'Status' : status} + if user_name: + params['UserName'] = user_name + return self.get_response('UpdateAccessKey', params) + + def delete_access_key(self, access_key_id, user_name=None): + """ + Delete an access key associated with a user. + + If the user_name is not specified, it is determined implicitly based + on the AWS Access Key ID used to sign the request. + + :type access_key_id: string + :param access_key_id: The ID of the access key to be deleted. + + :type user_name: string + :param user_name: The username of the new user + + """ + params = {'AccessKeyId' : access_key_id} + if user_name: + params['UserName'] = user_name + return self.get_response('DeleteAccessKey', params) + + # + # Signing Certificates + # + + def get_all_signing_certs(self, marker=None, max_items=None, + user_name=None): + """ + Get all signing certificates associated with an account. + + If the user_name is not specified, it is determined implicitly based + on the AWS Access Key ID used to sign the request. + + :type marker: string + :param marker: Use this only when paginating results and only in + follow-up request after you've received a response + where the results are truncated. Set this to the + value of the Marker element in the response you + just received. + + :type max_items: int + :param max_items: Use this only when paginating results to indicate + the maximum number of groups you want in the + response. + + :type user_name: string + :param user_name: The username of the user + + """ + params = {} + if marker: + params['Marker'] = marker + if max_items: + params['MaxItems'] = max_items + if user_name: + params['UserName'] = user_name + return self.get_response('ListSigningCertificates', + params, list_marker='Certificates') + + def update_signing_cert(self, cert_id, status, user_name=None): + """ + Change the status of the specified signing certificate from + Active to Inactive or vice versa. + + If the user_name is not specified, it is determined implicitly based + on the AWS Access Key ID used to sign the request. + + :type cert_id: string + :param cert_id: The ID of the signing certificate + + :type status: string + :param status: Either Active or Inactive. + + :type user_name: string + :param user_name: The username of the user + """ + params = {'CertificateId' : cert_id, + 'Status' : status} + if user_name: + params['UserName'] = user_name + return self.get_response('UpdateSigningCertificate', params) + + def upload_signing_cert(self, cert_body, user_name=None): + """ + Uploads an X.509 signing certificate and associates it with + the specified user. + + If the user_name is not specified, it is determined implicitly based + on the AWS Access Key ID used to sign the request. + + :type cert_body: string + :param cert_body: The body of the signing certificate. + + :type user_name: string + :param user_name: The username of the new user + + """ + params = {'CertificateBody' : cert_body} + if user_name: + params['UserName'] = user_name + return self.get_response('UploadSigningCertificate', params, + verb='POST') + + def delete_signing_cert(self, cert_id, user_name=None): + """ + Delete a signing certificate associated with a user. + + If the user_name is not specified, it is determined implicitly based + on the AWS Access Key ID used to sign the request. + + :type user_name: string + :param user_name: The username of the new user + + :type cert_id: string + :param cert_id: The ID of the certificate. + + """ + params = {'CertificateId' : cert_id} + if user_name: + params['UserName'] = user_name + return self.get_response('DeleteSigningCertificate', params) + + # + # Server Certificates + # + + def get_all_server_certs(self, path_prefix='/', + marker=None, max_items=None): + """ + Lists the server certificates that have the specified path prefix. + If none exist, the action returns an empty list. + + :type path_prefix: string + :param path_prefix: If provided, only certificates whose paths match + the provided prefix will be returned. + + :type marker: string + :param marker: Use this only when paginating results and only in + follow-up request after you've received a response + where the results are truncated. Set this to the + value of the Marker element in the response you + just received. + + :type max_items: int + :param max_items: Use this only when paginating results to indicate + the maximum number of groups you want in the + response. + + """ + params = {} + if path_prefix: + params['PathPrefix'] = path_prefix + if marker: + params['Marker'] = marker + if max_items: + params['MaxItems'] = max_items + return self.get_response('ListServerCertificates', + params, + list_marker='ServerCertificateMetadataList') + + def update_server_cert(self, cert_name, new_cert_name=None, + new_path=None): + """ + Updates the name and/or the path of the specified server certificate. + + :type cert_name: string + :param cert_name: The name of the server certificate that you want + to update. + + :type new_cert_name: string + :param new_cert_name: The new name for the server certificate. + Include this only if you are updating the + server certificate's name. + + :type new_path: string + :param new_path: If provided, the path of the certificate will be + changed to this path. + """ + params = {'ServerCertificateName' : cert_name} + if new_cert_name: + params['NewServerCertificateName'] = new_cert_name + if new_path: + params['NewPath'] = new_path + return self.get_response('UpdateServerCertificate', params) + + def upload_server_cert(self, cert_name, cert_body, private_key, + cert_chain=None, path=None): + """ + Uploads a server certificate entity for the AWS Account. + The server certificate entity includes a public key certificate, + a private key, and an optional certificate chain, which should + all be PEM-encoded. + + :type cert_name: string + :param cert_name: The name for the server certificate. Do not + include the path in this value. + + :type cert_body: string + :param cert_body: The contents of the public key certificate + in PEM-encoded format. + + :type private_key: string + :param private_key: The contents of the private key in + PEM-encoded format. + + :type cert_chain: string + :param cert_chain: The contents of the certificate chain. This + is typically a concatenation of the PEM-encoded + public key certificates of the chain. + + :type path: string + :param path: The path for the server certificate. + + """ + params = {'ServerCertificateName' : cert_name, + 'CertificateBody' : cert_body, + 'PrivateKey' : private_key} + if cert_chain: + params['CertificateChain'] = cert_chain + if path: + params['Path'] = path + return self.get_response('UploadServerCertificate', params, + verb='POST') + + def get_server_certificate(self, cert_name): + """ + Retrieves information about the specified server certificate. + + :type cert_name: string + :param cert_name: The name of the server certificate you want + to retrieve information about. + + """ + params = {'ServerCertificateName' : cert_name} + return self.get_response('GetServerCertificate', params) + + def delete_server_cert(self, cert_name): + """ + Delete the specified server certificate. + + :type cert_name: string + :param cert_name: The name of the server certificate you want + to delete. + + """ + params = {'ServerCertificateName' : cert_name} + return self.get_response('DeleteServerCertificate', params) + + # + # MFA Devices + # + + def get_all_mfa_devices(self, user_name, marker=None, max_items=None): + """ + Get all MFA devices associated with an account. + + :type user_name: string + :param user_name: The username of the user + + :type marker: string + :param marker: Use this only when paginating results and only in + follow-up request after you've received a response + where the results are truncated. Set this to the + value of the Marker element in the response you + just received. + + :type max_items: int + :param max_items: Use this only when paginating results to indicate + the maximum number of groups you want in the + response. + + """ + params = {'UserName' : user_name} + if marker: + params['Marker'] = marker + if max_items: + params['MaxItems'] = max_items + return self.get_response('ListMFADevices', + params, list_marker='MFADevices') + + def enable_mfa_device(self, user_name, serial_number, + auth_code_1, auth_code_2): + """ + Enables the specified MFA device and associates it with the + specified user. + + :type user_name: string + :param user_name: The username of the user + + :type serial_number: string + :param seriasl_number: The serial number which uniquely identifies + the MFA device. + + :type auth_code_1: string + :param auth_code_1: An authentication code emitted by the device. + + :type auth_code_2: string + :param auth_code_2: A subsequent authentication code emitted + by the device. + + """ + params = {'UserName' : user_name, + 'SerialNumber' : serial_number, + 'AuthenticationCode1' : auth_code_1, + 'AuthenticationCode2' : auth_code_2} + return self.get_response('EnableMFADevice', params) + + def deactivate_mfa_device(self, user_name, serial_number): + """ + Deactivates the specified MFA device and removes it from + association with the user. + + :type user_name: string + :param user_name: The username of the user + + :type serial_number: string + :param seriasl_number: The serial number which uniquely identifies + the MFA device. + + """ + params = {'UserName' : user_name, + 'SerialNumber' : serial_number} + return self.get_response('DeactivateMFADevice', params) + + def resync_mfa_device(self, user_name, serial_number, + auth_code_1, auth_code_2): + """ + Syncronizes the specified MFA device with the AWS servers. + + :type user_name: string + :param user_name: The username of the user + + :type serial_number: string + :param seriasl_number: The serial number which uniquely identifies + the MFA device. + + :type auth_code_1: string + :param auth_code_1: An authentication code emitted by the device. + + :type auth_code_2: string + :param auth_code_2: A subsequent authentication code emitted + by the device. + + """ + params = {'UserName' : user_name, + 'SerialNumber' : serial_number, + 'AuthenticationCode1' : auth_code_1, + 'AuthenticationCode2' : auth_code_2} + return self.get_response('ResyncMFADevice', params) + + # + # Login Profiles + # + + def create_login_profile(self, user_name, password): + """ + Creates a login profile for the specified user, give the user the + ability to access AWS services and the AWS Management Console. + + :type user_name: string + :param user_name: The name of the new user + + :type password: string + :param password: The new password for the user + + """ + params = {'UserName' : user_name, + 'Password' : password} + return self.get_response('CreateLoginProfile', params) + + def delete_login_profile(self, user_name): + """ + Deletes the login profile associated with the specified user. + + :type user_name: string + :param user_name: The name of the user to delete. + + """ + params = {'UserName' : user_name} + return self.get_response('DeleteLoginProfile', params) + + def update_login_profile(self, user_name, password): + """ + Resets the password associated with the user's login profile. + + :type user_name: string + :param user_name: The name of the user + + :type password: string + :param password: The new password for the user + + """ + params = {'UserName' : user_name, + 'Password' : password} + return self.get_response('UpdateLoginProfile', params) + + diff --git a/boto/iam/response.py b/boto/iam/response.py deleted file mode 100644 index fbbde90f..00000000 --- a/boto/iam/response.py +++ /dev/null @@ -1,146 +0,0 @@ -# Copyright (c) 2010 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. - -import xml.sax - -def pythonize_name(name): - s = '' - if name[0].isupper: - s = name[0].lower() - for c in name[1:]: - if c.isupper(): - s += '_' + c.lower() - else: - s += c - return s - -class XmlHandler(xml.sax.ContentHandler): - - def __init__(self, root_node, connection): - self.connection = connection - self.nodes = [('root', root_node)] - self.current_text = '' - - def startElement(self, name, attrs): - self.current_text = '' - t = self.nodes[-1][1].startElement(name, attrs, self.connection) - if t != None: - if isinstance(t, tuple): - self.nodes.append(t) - else: - self.nodes.append((name, t)) - - def endElement(self, name): - self.nodes[-1][1].endElement(name, self.current_text, self.connection) - if self.nodes[-1][0] == name: - self.nodes.pop() - self.current_text = '' - - def characters(self, content): - self.current_text += content - - def parse(self, s): - xml.sax.parseString(s, self) - -class Element(dict): - - def __init__(self, connection=None, element_name=None, - stack=None, parent=None, list_marker='Set'): - dict.__init__(self) - self.connection = connection - self.element_name = element_name - self.list_marker = list_marker - if stack is None: - self.stack = [] - else: - self.stack = stack - self.parent = parent - - def __getattr__(self, key): - if key in self: - return self[key] - for k in self: - e = self[k] - if isinstance(e, Element): - try: - return getattr(e, key) - except AttributeError: - pass - raise AttributeError - - def startElement(self, name, attrs, connection): - self.stack.append(name) - if name.endswith(self.list_marker): - l = ListElement(self.connection, name, self.list_marker) - self[pythonize_name(name)] = l - return l - if len(self.stack) > 0: - element_name = self.stack[-1] - e = Element(self.connection, element_name, self.stack, self, self.list_marker) - self[pythonize_name(element_name)] = e - return (element_name, e) - else: - return None - - def endElement(self, name, value, connection): - if len(self.stack) > 0: - self.stack.pop() - value = value.strip() - if value: - if isinstance(self.parent, Element): - self.parent[pythonize_name(name)] = value - elif isinstance(self.parent, ListElement): - self.parent.append(value) - -class ListElement(list): - - def __init__(self, connection=None, element_name=None, - list_marker='Set', item_marker='member'): - list.__init__(self) - self.connection = connection - self.element_name = element_name - self.list_marker = list_marker - self.item_marker = item_marker - - def startElement(self, name, attrs, connection): - if name.endswith(self.list_marker): - l = ListElement(self.connection, name) - setattr(self, pythonize_name(name), l) - return l - elif name == self.item_marker: - e = Element(self.connection, name, parent=self) - self.append(e) - return e - else: - return None - - def endElement(self, name, value, connection): - if name == self.element_name: - if len(self) > 0: - empty = [] - for e in self: - if isinstance(e, Element): - if len(e) == 0: - empty.append(e) - for e in empty: - self.remove(e) - else: - setattr(self, pythonize_name(name), value) |