azure Pyspark -是否可以将pyspark数据框写入Log Analytics工作区中的自定义日志表

brc7rcf0  于 2023-05-01  发布在  Spark
关注(0)|答案(1)|浏览(140)

我有一个pyspark dataframe,其中包含有关我在SQL数据库中的表的信息(创建日期,行数等)
样本数据:

{
   "Day":"2023-04-28",
   "Environment":"dev",
   "DatabaseName":"default",
   "TableName":"discount",
   "CountRows":31253
}

我想将此数据框写入我在Log Analytics工作区上创建的自定义日志表,可以吗?
谢谢大家!

sqougxex

sqougxex1#

您可以使用下面的代码将数据写入自定义日志表。

import base64
import hashlib
import hmac
import requests
import datetime

def build_signature(customer_id, shared_key, date, content_length, method, content_type, resource):
    x_headers = 'x-ms-date:' + date
    string_to_hash = '\n'.join([method, str(content_length), content_type, x_headers, resource])
    bytes_to_hash = bytes(string_to_hash, encoding='utf-8')
    key_bytes = base64.b64decode(shared_key)
    decoded_hash = hmac.new(key_bytes, bytes_to_hash, digestmod=hashlib.sha256).digest()
    encoded_hash = base64.b64encode(decoded_hash).decode('utf-8')
    authorization = f'SharedKey {customer_id}:{encoded_hash}'
    return authorization

def post_log_analytics_data(customer_id, shared_key, body, log_type):
    method = 'POST'
    content_type = 'application/json'
    resource = '/api/logs'
    rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
    content_length = len(body)
    
    signature = build_signature(
                    customer_id=customer_id,
                    shared_key=shared_key,
                    date=rfc1123date,
                    content_length=content_length,
                    method=method,
                    content_type=content_type,
                    resource=resource)
                    
    uri = f'https://{customer_id}.ods.opinsights.azure.com{resource}?api-version=2016-04-01'
    headers = {
    'Authorization': signature,
    'Log-Type': log_type,
    'x-ms-date': rfc1123date,
    "Content-Type": "application/json"
    }
    response = requests.post(uri, headers=headers, data=body)
    return response.status_code

json_data = <your dataframe>.toJSON().collect()[0]
status_code = post_log_analytics_data(<your workspace id>, <your primary key>, json_data, <log_type>)
print(status_code)

结果如下

相关问题