"Fossies" - the Fresh Open Source Software Archive

Member "ec2-api-12.0.0/ec2api/api/customer_gateway.py" (14 Apr 2021, 2902 Bytes) of package /linux/misc/openstack/ec2-api-12.0.0.tar.gz:


As a special service "Fossies" has tried to format the requested source page into HTML format using (guessed) Python source code syntax highlighting (style: standard) with prefixed line numbers. Alternatively you can here view or download the uninterpreted source code file. For more information about "customer_gateway.py" see the Fossies "Dox" file reference documentation.

    1 # Copyright 2014
    2 # The Cloudscaling Group, Inc.
    3 #
    4 # Licensed under the Apache License, Version 2.0 (the "License");
    5 # you may not use this file except in compliance with the License.
    6 # You may obtain a copy of the License at
    7 # http://www.apache.org/licenses/LICENSE-2.0
    8 #
    9 # Unless required by applicable law or agreed to in writing, software
   10 # distributed under the License is distributed on an "AS IS" BASIS,
   11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   12 # See the License for the specific language governing permissions and
   13 # limitations under the License.
   14 
   15 from ec2api.api import common
   16 from ec2api.api import ec2utils
   17 from ec2api.db import api as db_api
   18 from ec2api import exception
   19 from ec2api.i18n import _
   20 
   21 
   22 """Customer gateways related API implementation
   23 """
   24 
   25 
   26 Validator = common.Validator
   27 
   28 
   29 DEFAULT_BGP_ASN = 65000
   30 
   31 
   32 def create_customer_gateway(context, ip_address, type, bgp_asn=None):
   33     if bgp_asn and bgp_asn != DEFAULT_BGP_ASN:
   34         raise exception.Unsupported("BGP dynamic routing is unsupported")
   35     customer_gateway = next((cgw for cgw in db_api.get_items(context, 'cgw')
   36                              if cgw['ip_address'] == ip_address), None)
   37     if not customer_gateway:
   38         customer_gateway = db_api.add_item(context, 'cgw',
   39                                            {'ip_address': ip_address})
   40     return {'customerGateway': _format_customer_gateway(customer_gateway)}
   41 
   42 
   43 def delete_customer_gateway(context, customer_gateway_id):
   44     customer_gateway = ec2utils.get_db_item(context, customer_gateway_id)
   45     vpn_connections = db_api.get_items(context, 'vpn')
   46     if any(vpn['customer_gateway_id'] == customer_gateway['id']
   47            for vpn in vpn_connections):
   48         raise exception.IncorrectState(
   49             reason=_('The customer gateway is in use.'))
   50     db_api.delete_item(context, customer_gateway['id'])
   51     return True
   52 
   53 
   54 def describe_customer_gateways(context, customer_gateway_id=None,
   55                                filter=None):
   56     formatted_cgws = CustomerGatewayDescriber().describe(
   57         context, ids=customer_gateway_id, filter=filter)
   58     return {'customerGatewaySet': formatted_cgws}
   59 
   60 
   61 class CustomerGatewayDescriber(common.TaggableItemsDescriber,
   62                                common.NonOpenstackItemsDescriber):
   63 
   64     KIND = 'cgw'
   65     FILTER_MAP = {'bgp-asn': 'bgpAsn',
   66                   'customer-gateway-id': 'customerGatewayId',
   67                   'ip-address': 'ipAddress',
   68                   'state': 'state',
   69                   'type': 'type'}
   70 
   71     def format(self, customer_gateway):
   72         return _format_customer_gateway(customer_gateway)
   73 
   74 
   75 def _format_customer_gateway(customer_gateway):
   76     return {'customerGatewayId': customer_gateway['id'],
   77             'ipAddress': customer_gateway['ip_address'],
   78             'state': 'available',
   79             'type': 'ipsec.1',
   80             'bgpAsn': DEFAULT_BGP_ASN}