Files
feishu_bitable/append_bitables.py
2026-03-23 10:45:02 +08:00

131 lines
5.5 KiB
Python

# -*- coding: UTF-8 -*-
import api
import config
import logging
import utils
import copy
LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s"
logging.basicConfig(format=LOG_FORMAT, level=logging.INFO)
def get_all_records(client: api.Client, access_token: str, app_token: str, table_id: str):
records = []
page_token = None
while True:
resp = client.get_records_list(access_token, app_token, table_id, page_token=page_token)
items = resp.get('items', [])
if items:
records.extend(items)
if resp.get('has_more'):
page_token = resp.get('page_token')
else:
break
return records
def get_all_fields(client: api.Client, access_token: str, app_token: str, table_id: str):
fields = []
page_token = None
while True:
resp = client.get_fields_list(access_token, app_token, table_id, page_token=page_token)
items = resp.get('items', [])
if items:
fields.extend(items)
if resp.get('has_more'):
page_token = resp.get('page_token')
else:
break
return fields
def append_to_existing_table(client: api.Client, access_token: str, source_tables: list, target_app_token: str, target_table_id: str):
"""
Appends data from multiple source tables into one existing target table.
Missing fields in the target table will be automatically added.
"""
logging.info(f"Appending records from {len(source_tables)} source tables to target {target_app_token} / {target_table_id}")
# 1. Read existing Target Table schema
target_fields = get_all_fields(client, access_token, target_app_token, target_table_id)
target_field_map = {f['field_name']: f['field_id'] for f in target_fields}
# Check if "Source" field exists, if not, create it
if "Source" not in target_field_map:
try:
resp = client.add_field(access_token, target_app_token, target_table_id, "Source", 1, None)
target_field_map["Source"] = resp['field_id']
except utils.LarkException as e:
logging.error(f"Failed to create 'Source' field: {e}")
for source_app_token, source_table_id in source_tables:
logging.info(f"Processing source table {source_app_token} / {source_table_id}...")
# 2. Merge Missing Fields
source_fields = get_all_fields(client, access_token, source_app_token, source_table_id)
for f in source_fields:
name = f['field_name']
if name not in target_field_map:
ftype = f['type']
fprop = copy.deepcopy(f.get('property'))
# Clean up option IDs to prevent insertion errors in target table
if fprop and 'options' in fprop:
for opt in fprop['options']:
if 'id' in opt: del opt['id']
# Sometimes a field might be a reference to another table that isn't copied, so we might want to catch creation errors
try:
resp = client.add_field(access_token, target_app_token, target_table_id, name, ftype, fprop)
target_field_map[name] = resp['field_id']
except utils.LarkException as e:
logging.error(f"Failed to create field '{name}' in target: {e}")
# 3. Read & Insert Records
records = get_all_records(client, access_token, source_app_token, source_table_id)
batch = []
source_name = f"{source_app_token}/{source_table_id}"
for r in records:
original_fields = r.get('fields', {})
new_fields = {}
for fname, fvalue in original_fields.items():
if fname in target_field_map:
new_fields[fname] = fvalue
new_fields['Source'] = source_name
batch.append({'fields': new_fields})
# Feishu limit is usually 500 per request
CHUNK_SIZE = 500
for i in range(0, len(batch), CHUNK_SIZE):
chunk = batch[i:i + CHUNK_SIZE]
if chunk:
try:
client.batch_create_records(access_token, target_app_token, target_table_id, chunk)
except utils.LarkException as e:
logging.error(f"Failed to insert chunk of records: {e}")
logging.info(f"Appended records from {source_name} successfully.")
if __name__ == "__main__":
client = api.Client(config.LARK_HOST)
try:
access_token = client.get_tenant_access_token(config.APP_ID, config.APP_SECRET)
except Exception as e:
logging.error(f"Could not get access token: {e}")
exit(1)
logging.info(f"Using App ID: {config.APP_ID}")
SOURCE_TABLES = [
(config.MERGE_SOURCE_APP_TOKEN_1, config.MERGE_SOURCE_TABLE_ID_1),
(config.MERGE_SOURCE_APP_TOKEN_2, config.MERGE_SOURCE_TABLE_ID_2),
# Add more source tables as needed: ("app_token", "table_id")
]
TARGET_APP_TOKEN = config.MERGE_TARGET_APP_TOKEN
TARGET_TABLE_ID = "target_table_id_to_append_to" # PLEASE UPDATE THIS TO THE EXISTING MERGED TABLE ID
is_placeholder = (config.MERGE_SOURCE_APP_TOKEN_1 == "source_app_token_1")
if is_placeholder:
logging.warning("Please update config.py or environment variables with valid MERGE_* tokens to execute.")
else:
append_to_existing_table(client, access_token, SOURCE_TABLES, TARGET_APP_TOKEN, TARGET_TABLE_ID)