Hello
I created a dozen of python codes to help me migrate/merge/clean/import big blogs in ghost.
One thing I don’t like in ghost is that when you create a post, images are uploaded in current year/month directory. I want images to be moved in the right directory (corresponding to blog published_at date).
Another thing is about feature_image. I don’t want to duplicate the image and use one from my post.
So, my goal is to create a code to:
- move images to the right dir
- update links in mobiledoc
- auto setup feature image if not set with first landscape image (can be forced with specific image)
- remove .o files (I want to keep originals elsewhere)
Here is the code I use. It is not complete/robust and you NEED to know python to use it (I will not provide support).
As I spent hundreds of hours on that, I just want to share this code to help you if you have the same need or for inspiration if you want to do something equivalent in another language (ie js). I will be very happy if somebody come with something in js which can be triggered automatically…
Create json file with:
{ "myblog": {
"ghost_url": "http://yoururl.com",
"ghost_dir": "/mnt/data/....",
"admin_key": "YOUR_KEY"
}
}
Save following code in update.py file and launch:
python update.py --config ghost-config.json --server myblog
import argparse
import json
import os
import re
import requests
import jwt
import logging
import calendar
from pathlib import Path
from datetime import datetime
from PIL import Image
logger = logging.getLogger(__name__)
def get_config(config_file):
try:
with open(config_file) as json_file:
json_data = json.load(json_file)
return json_data
except (IOError, json.decoder.JSONDecodeError):
logger.error("Incorrect of missing JSON config file")
exit(1)
def authenticate(config, server):
token_limit = 15
admin_key = config[server]['admin_key']
admin_id, admin_secret = admin_key.split(':')
iat = int(datetime.now().timestamp())
header = {'alg': 'HS256', 'typ': 'JWT', 'kid': admin_id}
payload = {
'iat': iat,
'exp': iat + token_limit * 60,
'aud': '/v3/admin/'
}
token = jwt.encode(
payload, bytes.fromhex(admin_secret),
algorithm='HS256', headers=header)
return {'Authorization': 'Ghost {}'.format(token)}
def clean_name(img):
# remove image size in name
img = re.sub(r'-[0-9]*x([0-9])*-[0-9]\.', '.', img)
img = re.sub(r'-[0-9]*x([0-9])*\.', '.', img)
img = re.sub(r'-[0-9]\.', '.', img)
img = re.sub(r'.JPG', '.jpeg', img)
img = re.sub(r'.jpg', '.jpeg', img)
return img
def get_post(headers, config, server, post_slug):
url = '{}/ghost/api/v3/admin/posts/slug/{}/'.format(
config[server]['ghost_url'], post_slug)
headers = authenticate(config, server)
r = requests.get(url, headers=headers)
if r.status_code == 401:
"Cannot connect to server to get post"
exit(1)
elif r.status_code == 200:
r_post = json.loads(r.content)
return r_post['posts']
def get_posts(headers, config, server, year=None, month=None):
all_posts = []
ghost_url = config[server]['ghost_url']
url = '{}/ghost/api/v3/admin/posts/'.format(ghost_url)
page = 1
payload = {
'limit': '100',
'order': 'published_at desc',
'filter': 'status:published'
}
if year:
if month:
min_month = str(month).zfill(2)
max_month = str(month).zfill(2)
else:
min_month = '01'
max_month = '12'
payload['filter'] += '+published_at:>={}-{}-01'.format(
year, min_month
)
payload['filter'] += '+published_at:<={}-{}-{}'.format(
year, max_month,
calendar.monthrange(year, int(max_month))[1]
)
while page:
payload['page'] = page
r = requests.get(url, headers=headers, params=payload)
if r.status_code == 401:
logger.error("Cannot connect to server {}".format(ghost_url))
exit(1)
elif r.status_code == 200:
posts = json.loads(r.content)
if page == 1:
print('{} posts found ({} pages)'.format(
posts['meta']['pagination']['total'],
posts['meta']['pagination']['pages']
))
all_posts += posts['posts']
page = posts['meta']['pagination']['next']
# logger.info('Got {} posts'.format(len(all_posts)))
return all_posts
def extract_mobile_doc(mdoc_json):
text = []
images = []
for section in mdoc_json['sections']:
if section[0] == 1 and section[1] == 'p':
if (len(section[2]) > 0 and
len(section[2][0]) == 4 and
section[2][0][0] == 0):
text.append(section[2][0][3])
for card in mdoc_json['cards']:
if card[0] == 'image':
img = card[1]['src']
if img not in images:
# img = os.path.basename(img)
# img = clean_name(img)
images.append(img)
elif card[0] == 'gallery':
imgs = [k['src'] for k in card[1]['images']]
for img in imgs:
if img not in images:
# img = os.path.basename(img)
# img = clean_name(img)
images.append(img)
return text, images
def create_gallery_row(row, images, image_path, d_year, d_month):
gal_images = []
for image in images:
img_name = os.path.basename(image)
img_name = clean_name(img_name)
src = '{}/{}/{}/{}'.format(image_path, d_year, d_month, img_name)
try:
im = Image.open(src)
width, height = im.size
except FileNotFoundError:
print('Image {} not found --> Gallery broken'.format(src))
width = 1000
height = 1000
gal_images.append({
'fileName': img_name,
'row': row,
'src': '{}/content/images/{}/{}/{}'.format(
ghost_url, d_year, d_month, img_name),
'width': width,
'height': height
})
return gal_images
def create_gallery(images, image_path, d_year, d_month):
gal = create_gallery_row(
0, images[0:min(3, len(images))],
image_path, d_year, d_month)
if len(images) >= 4:
gal += create_gallery_row(
1, images[3:min(6, len(images))],
image_path, d_year, d_month)
if len(images) >= 7:
gal += create_gallery_row(
2, images[6:len(images)],
image_path, d_year, d_month)
return [
'gallery', {
'images': gal
}
]
def create_mobiledoc_image(image, image_path, d_year, d_month):
img_name = os.path.basename(image)
img_name = clean_name(img_name)
src = '{}/{}/{}/{}'.format(image_path, d_year, d_month, img_name)
try:
im = Image.open(src)
width, height = im.size
except FileNotFoundError:
print('Image {} not found'.format(src))
width = 1000
height = 1000
return[
'image', {
'src': '{}/content/images/{}/{}/{}'.format(
ghost_url, d_year, d_month, img_name),
'width': width,
'height': height
}]
def update_mobile_doc(mobiledoc, image_path, d_year, d_month):
mdoc_json = json.loads(mobiledoc)
new_cards = []
for card in mdoc_json['cards']:
if card[0] == 'image':
image = card[1]['src']
new_cards.append(
create_mobiledoc_image(
image, image_path, d_year, d_month))
elif card[0] == 'gallery':
images = [k['src'] for k in card[1]['images']]
new_cards.append(
create_gallery(images, image_path, d_year, d_month))
else:
new_cards.append(card)
mdoc_json['cards'] = new_cards
return mdoc_json
def update_featured_image(images, ghost_dir, d_year, d_month, f_image):
"""
Return featured image file (without path)
"""
if f_image:
f_image = os.path.basename(f_image)
f_img = '{}/content/images/{}/{}/{}'.format(
ghost_dir, d_year, d_month, f_image)
if f_img and Path(f_img).exists():
return f_image, False
# Find first landscape image
for image in images:
try:
image = os.path.basename(image)
src = '{}/content/images/{}/{}/{}'.format(
ghost_dir, d_year, d_month, image)
im = Image.open(src)
width, height = im.size
if width/height > 1:
image = os.path.basename(image)
return clean_name(image), True
except FileNotFoundError:
continue
# Fallback (no landscape): return first portrait image
if len(images) > 0:
image = os.path.basename(images[0])
return clean_name(image), True
else:
return None, False
def update(post, ghost_url, ghost_dir):
need_update = False
d = post['published_at']
d_time = datetime.strptime(d, "%Y-%m-%dT%H:%M:%S.%fZ")
d_year = d_time.strftime("%Y")
d_month = d_time.strftime("%m")
text, images = extract_mobile_doc(
json.loads(post['mobiledoc']))
for image in images:
local_img = image.replace(ghost_url, ghost_dir)
img = os.path.basename(image)
theory_img = '{}/content/images/{}/{}/{}'.format(
ghost_dir, d_year, d_month, img)
if local_img != theory_img:
logger.info("mv {} -> {}".format(
local_img, theory_img))
image = theory_img
need_update = True
# if need_update:
post['mobiledoc'] = update_mobile_doc(
post['mobiledoc'],
'{}/content/images/'.format(ghost_dir),
d_year, d_month)
f_image, f_update = update_featured_image(
images, ghost_dir, d_year, d_month,
post['feature_image'])
if f_update:
post['feature_image'] = '{}/content/images/{}/{}/{}'.format(
ghost_url, d_year, d_month, f_image)
# if need_update or f_update:
return post
# else:
# return None
def update_post(config, server, post):
url = '{}/ghost/api/v3/admin/posts/{}/'.format(
ghost_url,
post['id'])
body = {'posts': [{
'updated_at': post['updated_at'],
'mobiledoc': json.dumps(post['mobiledoc'], separators=(',', ':')),
'feature_image': post['feature_image']
}]}
headers = authenticate(config, server)
r = requests.put(url, json=body, headers=headers)
if r.status_code == 200:
print('{} updated'.format(post['slug']))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--config", type=str, required=True)
parser.add_argument("--server", type=str, required=True)
parser.add_argument("--post", type=str)
parser.add_argument("--fimage", type=str)
option = parser.parse_args()
config = get_config(option.config)
headers = authenticate(config, option.server)
ghost_url = config[option.server]['ghost_url']
ghost_dir = config[option.server]['ghost_dir']
if option.post:
posts = get_post(headers, config, option.server, option.post)
else:
posts = get_posts(headers, config, option.server)
for post in posts:
updated_post = update(post, ghost_url, ghost_dir)
if updated_post:
update_post(config, option.server, updated_post)