Script to move pictures at the right place and add feature image

Hello

I created a dozen of python codes to help me migrate/merge/clean/import big blogs in ghost.

One thing I don’t like in ghost is that when you create a post, images are uploaded in current year/month directory. I want images to be moved in the right directory (corresponding to blog published_at date).

Another thing is about feature_image. I don’t want to duplicate the image and use one from my post.

So, my goal is to create a code to:

  • move images to the right dir
  • update links in mobiledoc
  • auto setup feature image if not set with first landscape image (can be forced with specific image)
  • remove .o files (I want to keep originals elsewhere)

Here is the code I use. It is not complete/robust and you NEED to know python to use it (I will not provide support).

As I spent hundreds of hours on that, I just want to share this code to help you if you have the same need or for inspiration if you want to do something equivalent in another language (ie js). I will be very happy if somebody come with something in js which can be triggered automatically…

Create json file with:

{    "myblog": {
        "ghost_url": "http://yoururl.com",
        "ghost_dir": "/mnt/data/....",
        "admin_key": "YOUR_KEY"
    }
}

Save following code in update.py file and launch:

python update.py --config ghost-config.json --server myblog

import argparse
import json
import os
import re
import requests
import jwt
import logging
import calendar

from pathlib import Path
from datetime import datetime
from PIL import Image

logger = logging.getLogger(__name__)


def get_config(config_file):
    try:
        with open(config_file) as json_file:
            json_data = json.load(json_file)
            return json_data
    except (IOError, json.decoder.JSONDecodeError):
        logger.error("Incorrect of missing JSON config file")
        exit(1)


def authenticate(config, server):

    token_limit = 15

    admin_key = config[server]['admin_key']

    admin_id, admin_secret = admin_key.split(':')
    iat = int(datetime.now().timestamp())

    header = {'alg': 'HS256', 'typ': 'JWT', 'kid': admin_id}
    payload = {
        'iat': iat,
        'exp': iat + token_limit * 60,
        'aud': '/v3/admin/'
    }

    token = jwt.encode(
        payload, bytes.fromhex(admin_secret),
        algorithm='HS256', headers=header)

    return {'Authorization': 'Ghost {}'.format(token)}


def clean_name(img):
    # remove image size in name
    img = re.sub(r'-[0-9]*x([0-9])*-[0-9]\.', '.', img)
    img = re.sub(r'-[0-9]*x([0-9])*\.', '.', img)
    img = re.sub(r'-[0-9]\.', '.', img)

    img = re.sub(r'.JPG', '.jpeg', img)
    img = re.sub(r'.jpg', '.jpeg', img)
    return img


def get_post(headers, config, server, post_slug):
    url = '{}/ghost/api/v3/admin/posts/slug/{}/'.format(
        config[server]['ghost_url'], post_slug)

    headers = authenticate(config, server)
    r = requests.get(url, headers=headers)
    if r.status_code == 401:
        "Cannot connect to server to get post"
        exit(1)
    elif r.status_code == 200:
        r_post = json.loads(r.content)
        return r_post['posts']


def get_posts(headers, config, server, year=None, month=None):
    all_posts = []
    ghost_url = config[server]['ghost_url']

    url = '{}/ghost/api/v3/admin/posts/'.format(ghost_url)

    page = 1
    payload = {
        'limit': '100',
        'order': 'published_at desc',
        'filter': 'status:published'
        }

    if year:
        if month:
            min_month = str(month).zfill(2)
            max_month = str(month).zfill(2)
        else:
            min_month = '01'
            max_month = '12'

        payload['filter'] += '+published_at:>={}-{}-01'.format(
            year, min_month
        )
        payload['filter'] += '+published_at:<={}-{}-{}'.format(
            year, max_month,
            calendar.monthrange(year, int(max_month))[1]
        )

    while page:
        payload['page'] = page
        r = requests.get(url, headers=headers, params=payload)
        if r.status_code == 401:
            logger.error("Cannot connect to server {}".format(ghost_url))
            exit(1)
        elif r.status_code == 200:
            posts = json.loads(r.content)
            if page == 1:
                print('{} posts found ({} pages)'.format(
                    posts['meta']['pagination']['total'],
                    posts['meta']['pagination']['pages']
                ))

            all_posts += posts['posts']
            page = posts['meta']['pagination']['next']

    # logger.info('Got {} posts'.format(len(all_posts)))
    return all_posts


def extract_mobile_doc(mdoc_json):
    text = []
    images = []

    for section in mdoc_json['sections']:
        if section[0] == 1 and section[1] == 'p':
            if (len(section[2]) > 0 and
                    len(section[2][0]) == 4 and
                    section[2][0][0] == 0):

                text.append(section[2][0][3])

    for card in mdoc_json['cards']:
        if card[0] == 'image':
            img = card[1]['src']
            if img not in images:
                # img = os.path.basename(img)
                # img = clean_name(img)
                images.append(img)
        elif card[0] == 'gallery':
            imgs = [k['src'] for k in card[1]['images']]
            for img in imgs:
                if img not in images:
                    # img = os.path.basename(img)
                    # img = clean_name(img)
                    images.append(img)

    return text, images


def create_gallery_row(row, images, image_path, d_year, d_month):
    gal_images = []
    for image in images:
        img_name = os.path.basename(image)
        img_name = clean_name(img_name)
        src = '{}/{}/{}/{}'.format(image_path, d_year, d_month, img_name)

        try:
            im = Image.open(src)
            width, height = im.size
        except FileNotFoundError:
            print('Image {} not found --> Gallery broken'.format(src))
            width = 1000
            height = 1000

        gal_images.append({
            'fileName': img_name,
            'row': row,
            'src': '{}/content/images/{}/{}/{}'.format(
                ghost_url, d_year, d_month, img_name),
            'width': width,
            'height': height
        })
    return gal_images


def create_gallery(images, image_path, d_year, d_month):
    gal = create_gallery_row(
        0, images[0:min(3, len(images))],
        image_path, d_year, d_month)

    if len(images) >= 4:
        gal += create_gallery_row(
            1, images[3:min(6, len(images))],
            image_path, d_year, d_month)

    if len(images) >= 7:
        gal += create_gallery_row(
            2, images[6:len(images)],
            image_path, d_year, d_month)

    return [
        'gallery', {
            'images': gal
        }
    ]


def create_mobiledoc_image(image, image_path, d_year, d_month):
    img_name = os.path.basename(image)
    img_name = clean_name(img_name)
    src = '{}/{}/{}/{}'.format(image_path, d_year, d_month, img_name)

    try:
        im = Image.open(src)
        width, height = im.size
    except FileNotFoundError:
        print('Image {} not found'.format(src))
        width = 1000
        height = 1000

    return[
        'image', {
            'src': '{}/content/images/{}/{}/{}'.format(
                ghost_url, d_year, d_month, img_name),
            'width': width,
            'height': height
        }]


def update_mobile_doc(mobiledoc, image_path, d_year, d_month):
    mdoc_json = json.loads(mobiledoc)
    new_cards = []

    for card in mdoc_json['cards']:
        if card[0] == 'image':
            image = card[1]['src']
            new_cards.append(
                create_mobiledoc_image(
                    image, image_path, d_year, d_month))
        elif card[0] == 'gallery':
            images = [k['src'] for k in card[1]['images']]
            new_cards.append(
                create_gallery(images, image_path, d_year, d_month))
        else:
            new_cards.append(card)

    mdoc_json['cards'] = new_cards

    return mdoc_json


def update_featured_image(images, ghost_dir, d_year, d_month, f_image):
    """
        Return featured image file (without path)
    """

    if f_image:
        f_image = os.path.basename(f_image)
        f_img = '{}/content/images/{}/{}/{}'.format(
            ghost_dir, d_year, d_month, f_image)

        if f_img and Path(f_img).exists():
            return f_image, False

    # Find first landscape image
    for image in images:
        try:
            image = os.path.basename(image)
            src = '{}/content/images/{}/{}/{}'.format(
                ghost_dir, d_year, d_month, image)
            im = Image.open(src)
            width, height = im.size
            if width/height > 1:
                image = os.path.basename(image)
                return clean_name(image), True
        except FileNotFoundError:
            continue

    # Fallback (no landscape): return first portrait image
    if len(images) > 0:
        image = os.path.basename(images[0])
        return clean_name(image), True
    else:
        return None, False


def update(post, ghost_url, ghost_dir):
    need_update = False

    d = post['published_at']
    d_time = datetime.strptime(d, "%Y-%m-%dT%H:%M:%S.%fZ")
    d_year = d_time.strftime("%Y")
    d_month = d_time.strftime("%m")

    text, images = extract_mobile_doc(
        json.loads(post['mobiledoc']))

    for image in images:
        local_img = image.replace(ghost_url, ghost_dir)
        img = os.path.basename(image)
        theory_img = '{}/content/images/{}/{}/{}'.format(
            ghost_dir, d_year, d_month, img)
        if local_img != theory_img:
            logger.info("mv {} -> {}".format(
                local_img, theory_img))
            image = theory_img
            need_update = True

    # if need_update:
    post['mobiledoc'] = update_mobile_doc(
        post['mobiledoc'],
        '{}/content/images/'.format(ghost_dir),
        d_year, d_month)

    f_image, f_update = update_featured_image(
        images, ghost_dir, d_year, d_month,
        post['feature_image'])
    if f_update:
        post['feature_image'] = '{}/content/images/{}/{}/{}'.format(
            ghost_url, d_year, d_month, f_image)

    # if need_update or f_update:
    return post
    # else:
    #     return None


def update_post(config, server, post):

    url = '{}/ghost/api/v3/admin/posts/{}/'.format(
        ghost_url,
        post['id'])

    body = {'posts': [{
        'updated_at': post['updated_at'],
        'mobiledoc': json.dumps(post['mobiledoc'], separators=(',', ':')),
        'feature_image': post['feature_image']
    }]}

    headers = authenticate(config, server)
    r = requests.put(url, json=body, headers=headers)
    if r.status_code == 200:
        print('{} updated'.format(post['slug']))


if __name__ == "__main__":

    parser = argparse.ArgumentParser()
    parser.add_argument("--config", type=str, required=True)
    parser.add_argument("--server", type=str, required=True)
    parser.add_argument("--post", type=str)
    parser.add_argument("--fimage", type=str)

    option = parser.parse_args()

    config = get_config(option.config)

    headers = authenticate(config, option.server)

    ghost_url = config[option.server]['ghost_url']
    ghost_dir = config[option.server]['ghost_dir']

    if option.post:
        posts = get_post(headers, config, option.server, option.post)
    else:
        posts = get_posts(headers, config, option.server)

    for post in posts:
        updated_post = update(post, ghost_url, ghost_dir)
        if updated_post:
            update_post(config, option.server, updated_post)
1 Like