In my blog AWS Elasticsearch Service with Firehose Delivery Stream we saw how easy it is to setup an Elasticsearch cluster, ingesting data data from Firehose and creating dashboards in Kibana. This time we use the same architecture to ingest access logs from AWS ApiGateway and analyze the data in Kibana.
API Gateway Account
In every AWS account there is a single API Gateway (APIGW) service. APIGW can hosts multiple RestApi instances. Each RestApi instance contains multiple stages like dev, test or prod. To prepare for logging, the single APIGW service instance has to have permissions to access CloudWatch logs. The configuration is easy. Create a role that the APIGW service can assume, and create an ‘AWS::ApiGateway::Account’ that points to the role.
CloudWatchLogRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- apigateway.amazonaws.com
Action: sts:AssumeRole
Path: /
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AmazonAPIGatewayPushToCloudWatchLogs
Account:
Type: AWS::ApiGateway::Account
Properties:
CloudWatchRoleArn: !GetAtt 'CloudWatchLogRole.Arn'
API Gateway Deployment
A RestApi instance contains a Deployment that specifies, among others, logging and tracing configuration. To configure access logs, we have to configure the AccessLogs settings and define a log line Format. The RestApi will use this format to create access log lines in CloudWatch logs. There are several standard formats to choose from, but you can also specify a standard format like the JSON below.
ApiGatewayDeployment:
Type: AWS::ApiGateway::Deployment
Properties:
RestApiId: !Ref RestAPIv1
StageName: dev
Stagecontent:
DataTraceEnabled: true
LoggingLevel: INFO
MetricsEnabled: true
TracingEnabled: true
MethodSettings:
- LoggingLevel: INFO
ResourcePath: /*
HttpMethod: '*'
AccessLogSetting:
DestinationArn: !GetAtt 'CloudWatchAccessLogGroup.Arn'
Format: >-
{
"requestId":"$context.requestId",
"ip": "$context.identity.sourceIp",
"caller":"$context.identity.caller",
"user":"$context.identity.user",
"requestTime":"$context.requestTime",
"httpMethod":"$context.httpMethod",
"resourcePath":"$context.resourcePath",
"status":"$context.status",
"protocol":"$context.protocol",
"responseLength":"$context.responseLength"
}
When the gateway is accessed by typing make hello
or make error
, the RestApi will be invoked and the following access logs will appear:
Error:
{
"requestId": "3b864b4b-ea50-11e8-b859-9bd3a7d2b23c",
"ip": "217.19.26.243",
"caller": "-",
"user": "-",
"requestTime": "17/Nov/2018:10:04:55 +0000",
"httpMethod": "GET",
"resourcePath": "/error",
"status": "502",
"protocol": "HTTP/1.1",
"responseLength": "36"
}
Hello:
{
"requestId": "3a71f4fa-ea50-11e8-b4f5-8116b21e9431",
"ip": "217.19.26.243",
"caller": "-",
"user": "-",
"requestTime": "17/Nov/2018:10:04:53 +0000",
"httpMethod": "GET",
"resourcePath": "/hello",
"status": "200",
"protocol": "HTTP/1.1",
"responseLength": "14"
}
CloudWatch Logs Subscription Filter
A CloudWatch Logs Subscription Filter (CLSF) sends log events to Kinesis stream, Kinesis Data Firehose delivery stream, or Lambda function. To access these resources, the CLSF has to have permissions. The configuration is easy. Create a role that CloudWatch can assume and configure that role on the ‘AWS::Logs::SubscriptionFilter’ resource. The filter is empty so all log lines will be processed.
CloudWatchLogSubscriptionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- logs.eu-west-1.amazonaws.com
Action: sts:AssumeRole
Path: /
Policies:
- PolicyName: Allow
PolicyDocument:
Statement:
- Effect: Allow
Action:
- firehose:*
Resource:
- '*'
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AmazonAPIGatewayPushToCloudWatchLogs
CloudWatchLogSubscription:
Type: AWS::Logs::SubscriptionFilter
Properties:
DestinationArn: !GetAtt Deliverystream.Arn
FilterPattern: ''
LogGroupName: !Ref CloudWatchAccessLogGroup
RoleArn: !GetAtt CloudWatchLogSubscriptionRole.Arn
Decompressing, Decoding and Selecting
When we look at a received CloudWatch event, we see that the logEvents
field contain a single entry, which is the access log. The access log is stored in a JSON field as a string so we have to decode the field to JSON. When the message field has been decoded, we select only the first element of the logEvents field, which is the access log.
{
"messageType": "DATA_MESSAGE",
"owner": "612483924670",
"logGroup": "api-gateway-access-logs-dev",
"logStream": "8d5e957f297893487bd98fa830fa6413",
"subscriptionFilters": [
"blog-aws-elasticsearch-firehose-api-gw-example-elasticsearch-CloudWatchLogSubscription-95E57LT45IU0"
],
"logEvents": [
{
"id": "34397996112421660129451470773032598202872829922794405888",
"timestamp": 1542459492102,
"message": "{ "requestId":"70939afd-ea68-11e8-add9-17a2f8edb26e", "ip": "217.19.26.243", "caller":"-", "user":"-", "requestTime":"17/Nov/2018:12:58:12 +0000", "httpMethod":"GET", "resourcePath":"/error", "status":"502", "protocol":"HTTP/1.1", "responseLength":"36" }"
}
]
}
Firehose Processor
In order to correctly index log lines we have to post-process the log lines before they are published to Elasticsearch. Firehose supports processing messages before delivery by means of AWS Lambda. CloudWatch events publishes the log lines in Gzip format to Elasticsearch. The cloudwatch event also contains information that we are not interested in, we are only interested in the fields of the access logs. We use the following lambda to process the logs:
processor:
from base64 import b64encode, b64decode
import json
import gzip
def decompress(data):
return gzip.decompress(data)
def decode_record(data: dict) -> dict:
x = decompress(b64decode(data['data']))
return json.loads(x.decode('utf8'))
def handler(event, context):
records = event['records']
for record in records:
record.pop('approximateArrivalTimestamp', None)
decoded = decode_record(record)
if decoded['messageType'] == "DATA_MESSAGE":
event = decoded['logEvents'][0]
event.update({'message': json.loads(event['message'])})
msg = b64encode(bytes(json.dumps(event), 'utf-8')).decode('ascii')
record.update({'data': msg})
record.update({'result': 'Ok'}) # Ok, Dropped, ProcessingFailed
else:
record.update({'result': 'Dropped'}) # Ok, Dropped, ProcessingFailed
return {'records': records}
Example
The example project shows how to configure a project to create an elasticsearch cluster and to ingest API Gateway access logs. The example can be deployed with make merge-lambda && make merge-swagger && make deploy
and removed with make delete
. To publish access the API Gateway type make hello
and make error
to get some entries in the access logs.
Kibana
Log into the ‘AWS Console’, then the ‘Elasticsearch service dashboard’, and click on the Kibana URL. Once logged in, click on ‘discover’ and create a new index pattern with the name example-*
. Click on ‘discover’ another time and you should see data. If not, type make hello
and make error
a couple of times in the console to have data available in ES. To search for data type message.status:50*
or message.status:20*
in the search bar.
Try to create a visualization and dashboards with the access logs you have available. If you have read my previous blog post about AWS Elasticsearch Service with Firehose Delivery Stream it shouldn’t be difficult
Conclusion
In this example we have deployed a Elasticsearch service, ingested access logs, and created a dashboard. Elasticsearch is perfect for analyzing access logs to get real time information about how an API is performing. Dashboards provide aggregated data and provide insights that can be used to make changes to the platform. Next time we’ll look at ingesting CloudTrail logs and get insights to who is doing what in the AWS account.